diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 4dda11ad..ecd26d1a 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -806,36 +806,6 @@ pub(crate) async fn invite_helper<'a>( ); let state_lock = mutex_state.lock().await; - let prev_events: Vec<_> = db - .rooms - .get_pdu_leaves(room_id)? - .into_iter() - .take(20) - .collect(); - - let create_event = db - .rooms - .room_state_get(room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default - // version right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version - }); - let room_version = - RoomVersion::new(&room_version_id).expect("room version is supported"); - let content = to_raw_value(&RoomMemberEventContent { avatar_url: None, displayname: None, @@ -851,98 +821,7 @@ pub(crate) async fn invite_helper<'a>( let state_key = user_id.to_string(); let kind = StateEventType::RoomMember; - let auth_events = db.rooms.get_auth_events( - room_id, - &kind.to_string().into(), - sender_user, - Some(&state_key), - &content, - )?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = BTreeMap::new(); - - if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? { - unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); - unsigned.insert( - "prev_sender".to_owned(), - to_raw_value(&prev_pdu.sender).expect("UserId is valid"), - ); - } - - let pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: room_id.to_owned(), - sender: sender_user.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: kind.to_string().into(), - content, - state_key: Some(state_key), - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts: None, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - to_canonical_value(db.globals.server_name()) - .expect("server name is a valid CanonicalJsonValue"), - ); - - ruma::signatures::hash_and_sign_event( - db.globals.server_name().as_str(), - db.globals.keypair(), - &mut pdu_json, - &room_version_id, - ) - .expect("event is valid, we just created it"); + let (pdu, pdu_json) = create_hash_and_sign_event(); let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 6fa83e4c..f60f735a 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -882,1163 +882,6 @@ pub async fn send_transaction_message_route( Ok(send_transaction_message::v1::Response { pdus: resolved_map }) } -/// An async function that can recursively call itself. -type AsyncRecursiveType<'a, T> = Pin + 'a + Send>>; - -/// When receiving an event one needs to: -/// 0. Check the server is in the room -/// 1. Skip the PDU if we already know about it -/// 2. Check signatures, otherwise drop -/// 3. Check content hash, redact if doesn't match -/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not -/// timeline events -/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are -/// also rejected "due to auth events" -/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events -/// 7. Persist this event as an outlier -/// 8. If not timeline event: stop -/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline -/// events -/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities -/// doing all the checks in this list starting at 1. These are not timeline events -/// 11. Check the auth of the event passes based on the state of the event -/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by -/// doing state res where one of the inputs was a previously trusted set of state, don't just -/// trust a set of state we got from a remote) -/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" -/// it -/// 14. Use state resolution to find new room state -// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively -#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] -pub(crate) async fn handle_incoming_pdu<'a>( - origin: &'a ServerName, - event_id: &'a EventId, - room_id: &'a RoomId, - value: BTreeMap, - is_timeline_event: bool, - db: &'a Database, - pub_key_map: &'a RwLock>>, -) -> Result>, String> { - match db.rooms.exists(room_id) { - Ok(true) => {} - _ => { - return Err("Room is unknown to this server.".to_owned()); - } - } - - match db.rooms.is_disabled(room_id) { - Ok(false) => {} - _ => { - return Err("Federation of this room is currently disabled on this server.".to_owned()); - } - } - - // 1. Skip the PDU if we already have it as a timeline event - if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(event_id) { - return Ok(Some(pdu_id.to_vec())); - } - - let create_event = db - .rooms - .room_state_get(room_id, &StateEventType::RoomCreate, "") - .map_err(|_| "Failed to ask database for event.".to_owned())? - .ok_or_else(|| "Failed to find create event in db.".to_owned())?; - - let first_pdu_in_room = db - .rooms - .first_pdu_in_room(room_id) - .map_err(|_| "Error loading first room event.".to_owned())? - .expect("Room exists"); - - let (incoming_pdu, val) = handle_outlier_pdu( - origin, - &create_event, - event_id, - room_id, - value, - db, - pub_key_map, - ) - .await?; - - // 8. if not timeline event: stop - if !is_timeline_event { - return Ok(None); - } - - if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { - return Ok(None); - } - - // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - let mut graph: HashMap, _> = HashMap::new(); - let mut eventid_info = HashMap::new(); - let mut todo_outlier_stack: Vec> = incoming_pdu.prev_events.clone(); - - let mut amount = 0; - - while let Some(prev_event_id) = todo_outlier_stack.pop() { - if let Some((pdu, json_opt)) = fetch_and_handle_outliers( - db, - origin, - &[prev_event_id.clone()], - &create_event, - room_id, - pub_key_map, - ) - .await - .pop() - { - if amount > 100 { - // Max limit reached - warn!("Max prev event limit reached!"); - graph.insert(prev_event_id.clone(), HashSet::new()); - continue; - } - - if let Some(json) = - json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) - { - if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts { - amount += 1; - for prev_prev in &pdu.prev_events { - if !graph.contains_key(prev_prev) { - todo_outlier_stack.push(dbg!(prev_prev.clone())); - } - } - - graph.insert( - prev_event_id.clone(), - pdu.prev_events.iter().cloned().collect(), - ); - } else { - // Time based check failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - - eventid_info.insert(prev_event_id.clone(), (pdu, json)); - } else { - // Get json failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - } else { - // Fetch and handle failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - } - - let sorted = state_res::lexicographical_topological_sort(dbg!(&graph), |event_id| { - // This return value is the key used for sorting events, - // events are then sorted by power level, time, - // and lexically by event_id. - println!("{}", event_id); - Ok(( - int!(0), - MilliSecondsSinceUnixEpoch( - eventid_info - .get(event_id) - .map_or_else(|| uint!(0), |info| info.0.origin_server_ts), - ), - )) - }) - .map_err(|_| "Error sorting prev events".to_owned())?; - - let mut errors = 0; - for prev_id in dbg!(sorted) { - match db.rooms.is_disabled(room_id) { - Ok(false) => {} - _ => { - return Err( - "Federation of this room is currently disabled on this server.".to_owned(), - ); - } - } - - if let Some((time, tries)) = db - .globals - .bad_event_ratelimiter - .read() - .unwrap() - .get(&*prev_id) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", prev_id); - continue; - } - } - - if errors >= 5 { - break; - } - if let Some((pdu, json)) = eventid_info.remove(&*prev_id) { - if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { - continue; - } - - let start_time = Instant::now(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); - if let Err(e) = upgrade_outlier_to_timeline_pdu( - pdu, - json, - &create_event, - origin, - db, - room_id, - pub_key_map, - ) - .await - { - errors += 1; - warn!("Prev event {} failed: {}", prev_id, e); - match db - .globals - .bad_event_ratelimiter - .write() - .unwrap() - .entry((*prev_id).to_owned()) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1 + 1) - } - } - } - let elapsed = start_time.elapsed(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .remove(&room_id.to_owned()); - warn!( - "Handling prev event {} took {}m{}s", - prev_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - } - } - - let start_time = Instant::now(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); - let r = upgrade_outlier_to_timeline_pdu( - incoming_pdu, - val, - &create_event, - origin, - db, - room_id, - pub_key_map, - ) - .await; - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .remove(&room_id.to_owned()); - - r -} - -#[tracing::instrument(skip(create_event, value, db, pub_key_map))] -fn handle_outlier_pdu<'a>( - origin: &'a ServerName, - create_event: &'a PduEvent, - event_id: &'a EventId, - room_id: &'a RoomId, - value: BTreeMap, - db: &'a Database, - pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveType<'a, Result<(Arc, BTreeMap), String>> { - Box::pin(async move { - // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json - - // We go through all the signatures we see on the value and fetch the corresponding signing - // keys - fetch_required_signing_keys(&value, pub_key_map, db) - .await - .map_err(|e| e.to_string())?; - - // 2. Check signatures, otherwise drop - // 3. check content hash, redact if doesn't match - - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - "Invalid create event in db.".to_owned() - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - - let mut val = match ruma::signatures::verify_event( - &*pub_key_map.read().map_err(|_| "RwLock is poisoned.")?, - &value, - room_version_id, - ) { - Err(e) => { - // Drop - warn!("Dropping bad event {}: {}", event_id, e); - return Err("Signature verification failed".to_owned()); - } - Ok(ruma::signatures::Verified::Signatures) => { - // Redact - warn!("Calculated hash does not match: {}", event_id); - match ruma::signatures::redact(&value, room_version_id) { - Ok(obj) => obj, - Err(_) => return Err("Redaction failed".to_owned()), - } - } - Ok(ruma::signatures::Verified::All) => value, - }; - - // Now that we have checked the signature and hashes we can add the eventID and convert - // to our PduEvent type - val.insert( - "event_id".to_owned(), - CanonicalJsonValue::String(event_id.as_str().to_owned()), - ); - let incoming_pdu = serde_json::from_value::( - serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), - ) - .map_err(|_| "Event is not a valid PDU.".to_owned())?; - - // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events - // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" - // EDIT: Step 5 is not applied anymore because it failed too often - warn!("Fetching auth events for {}", incoming_pdu.event_id); - fetch_and_handle_outliers( - db, - origin, - &incoming_pdu - .auth_events - .iter() - .map(|x| Arc::from(&**x)) - .collect::>(), - create_event, - room_id, - pub_key_map, - ) - .await; - - // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - info!( - "Auth check for {} based on auth events", - incoming_pdu.event_id - ); - - // Build map of auth events - let mut auth_events = HashMap::new(); - for id in &incoming_pdu.auth_events { - let auth_event = match db.rooms.get_pdu(id).map_err(|e| e.to_string())? { - Some(e) => e, - None => { - warn!("Could not find auth event {}", id); - continue; - } - }; - - match auth_events.entry(( - auth_event.kind.to_string().into(), - auth_event - .state_key - .clone() - .expect("all auth events have state keys"), - )) { - hash_map::Entry::Vacant(v) => { - v.insert(auth_event); - } - hash_map::Entry::Occupied(_) => { - return Err( - "Auth event's type and state_key combination exists multiple times." - .to_owned(), - ) - } - } - } - - // The original create event must be in the auth events - if auth_events - .get(&(StateEventType::RoomCreate, "".to_owned())) - .map(|a| a.as_ref()) - != Some(create_event) - { - return Err("Incoming event refers to wrong create event.".to_owned()); - } - - if !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::, // TODO: third party invite - |k, s| auth_events.get(&(k.to_string().into(), s.to_owned())), - ) - .map_err(|_e| "Auth check failed".to_owned())? - { - return Err("Event has failed auth check with auth events.".to_owned()); - } - - info!("Validation successful."); - - // 7. Persist the event as an outlier. - db.rooms - .add_pdu_outlier(&incoming_pdu.event_id, &val) - .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; - info!("Added pdu as outlier."); - - Ok((Arc::new(incoming_pdu), val)) - }) -} - -#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))] -async fn upgrade_outlier_to_timeline_pdu( - incoming_pdu: Arc, - val: BTreeMap, - create_event: &PduEvent, - origin: &ServerName, - db: &Database, - room_id: &RoomId, - pub_key_map: &RwLock>>, -) -> Result>, String> { - if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { - return Ok(Some(pduid)); - } - - if db - .rooms - .is_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to ask db for soft fail".to_owned())? - { - return Err("Event has been soft failed".into()); - } - - info!("Upgrading {} to timeline pdu", incoming_pdu.event_id); - - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - "Invalid create event in db.".to_owned() - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - - // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities - // doing all the checks in this list starting at 1. These are not timeline events. - - // TODO: if we know the prev_events of the incoming event we can avoid the request and build - // the state from a known point and resolve if > 1 prev_event - - info!("Requesting state at event"); - let mut state_at_incoming_event = None; - - if incoming_pdu.prev_events.len() == 1 { - let prev_event = &*incoming_pdu.prev_events[0]; - let prev_event_sstatehash = db - .rooms - .pdu_shortstatehash(prev_event) - .map_err(|_| "Failed talking to db".to_owned())?; - - let state = if let Some(shortstatehash) = prev_event_sstatehash { - Some(db.rooms.state_full_ids(shortstatehash).await) - } else { - None - }; - - if let Some(Ok(mut state)) = state { - info!("Using cached state"); - let prev_pdu = - db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { - "Could not find prev event, but we know the state.".to_owned() - })?; - - if let Some(state_key) = &prev_pdu.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &prev_pdu.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - state.insert(shortstatekey, Arc::from(prev_event)); - // Now it's the state after the pdu - } - - state_at_incoming_event = Some(state); - } - } else { - info!("Calculating state at event using state res"); - let mut extremity_sstatehashes = HashMap::new(); - - let mut okay = true; - for prev_eventid in &incoming_pdu.prev_events { - let prev_event = if let Ok(Some(pdu)) = db.rooms.get_pdu(prev_eventid) { - pdu - } else { - okay = false; - break; - }; - - let sstatehash = if let Ok(Some(s)) = db.rooms.pdu_shortstatehash(prev_eventid) { - s - } else { - okay = false; - break; - }; - - extremity_sstatehashes.insert(sstatehash, prev_event); - } - - if okay { - let mut fork_states = Vec::with_capacity(extremity_sstatehashes.len()); - let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); - - for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: BTreeMap<_, _> = db - .rooms - .state_full_ids(sstatehash) - .await - .map_err(|_| "Failed to ask db for room state.".to_owned())?; - - if let Some(state_key) = &prev_event.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &prev_event.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); - // Now it's the state after the pdu - } - - let mut state = StateMap::with_capacity(leaf_state.len()); - let mut starting_events = Vec::with_capacity(leaf_state.len()); - - for (k, id) in leaf_state { - if let Ok((ty, st_key)) = db.rooms.get_statekey_from_short(k) { - // FIXME: Undo .to_string().into() when StateMap - // is updated to use StateEventType - state.insert((ty.to_string().into(), st_key), id.clone()); - } else { - warn!("Failed to get_statekey_from_short."); - } - starting_events.push(id); - } - - auth_chain_sets.push( - get_auth_chain(room_id, starting_events, db) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), - ); - - fork_states.push(state); - } - - let lock = db.globals.stateres_mutex.lock(); - - let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }); - drop(lock); - - state_at_incoming_event = match result { - Ok(new_state) => Some( - new_state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &event_type.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; - Ok((shortstatekey, event_id)) - }) - .collect::>()?, - ), - Err(e) => { - warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); - None - } - } - } - } - - if state_at_incoming_event.is_none() { - info!("Calling /state_ids"); - // Call /state_ids to find out what the state at this pdu is. We trust the server's - // response to some extend, but we still do a lot of checks on the events - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_room_state_ids::v1::Request { - room_id, - event_id: &incoming_pdu.event_id, - }, - ) - .await - { - Ok(res) => { - info!("Fetching state events at event."); - let state_vec = fetch_and_handle_outliers( - db, - origin, - &res.pdu_ids - .iter() - .map(|x| Arc::from(&**x)) - .collect::>(), - create_event, - room_id, - pub_key_map, - ) - .await; - - let mut state: BTreeMap<_, Arc> = BTreeMap::new(); - for (pdu, _) in state_vec { - let state_key = pdu - .state_key - .clone() - .ok_or_else(|| "Found non-state pdu in state events.".to_owned())?; - - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &pdu.kind.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - match state.entry(shortstatekey) { - btree_map::Entry::Vacant(v) => { - v.insert(Arc::from(&*pdu.event_id)); - } - btree_map::Entry::Occupied(_) => return Err( - "State event's type and state_key combination exists multiple times." - .to_owned(), - ), - } - } - - // The original create event must still be in the state - let create_shortstatekey = db - .rooms - .get_shortstatekey(&StateEventType::RoomCreate, "") - .map_err(|_| "Failed to talk to db.")? - .expect("Room exists"); - - if state.get(&create_shortstatekey).map(|id| id.as_ref()) - != Some(&create_event.event_id) - { - return Err("Incoming event refers to wrong create event.".to_owned()); - } - - state_at_incoming_event = Some(state); - } - Err(e) => { - warn!("Fetching state for event failed: {}", e); - return Err("Fetching state for event failed".into()); - } - }; - } - - let state_at_incoming_event = - state_at_incoming_event.expect("we always set this to some above"); - - info!("Starting auth check"); - // 11. Check the auth of the event passes based on the state of the event - let check_result = state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::, // TODO: third party invite - |k, s| { - db.rooms - .get_shortstatekey(&k.to_string().into(), s) - .ok() - .flatten() - .and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey)) - .and_then(|event_id| db.rooms.get_pdu(event_id).ok().flatten()) - }, - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - - if !check_result { - return Err("Event has failed auth check with state at the event.".into()); - } - info!("Auth check succeeded"); - - // We start looking at current room state now, so lets lock the room - - let mutex_state = Arc::clone( - db.globals - .roomid_mutex_state - .write() - .unwrap() - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - - // Now we calculate the set of extremities this room has after the incoming event has been - // applied. We start with the previous extremities (aka leaves) - info!("Calculating extremities"); - let mut extremities = db - .rooms - .get_pdu_leaves(room_id) - .map_err(|_| "Failed to load room leaves".to_owned())?; - - // Remove any forward extremities that are referenced by this incoming event's prev_events - for prev_event in &incoming_pdu.prev_events { - if extremities.contains(prev_event) { - extremities.remove(prev_event); - } - } - - // Only keep those extremities were not referenced yet - extremities.retain(|id| !matches!(db.rooms.is_event_referenced(room_id, id), Ok(true))); - - info!("Compressing state at event"); - let state_ids_compressed = state_at_incoming_event - .iter() - .map(|(shortstatekey, id)| { - db.rooms - .compress_state_event(*shortstatekey, id, &db.globals) - .map_err(|_| "Failed to compress_state_event".to_owned()) - }) - .collect::>()?; - - // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - info!("Starting soft fail auth check"); - - let auth_events = db - .rooms - .get_auth_events( - room_id, - &incoming_pdu.kind, - &incoming_pdu.sender, - incoming_pdu.state_key.as_deref(), - &incoming_pdu.content, - ) - .map_err(|_| "Failed to get_auth_events.".to_owned())?; - - let soft_fail = !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::, - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - - if soft_fail { - append_incoming_pdu( - db, - &incoming_pdu, - val, - extremities.iter().map(Deref::deref), - state_ids_compressed, - soft_fail, - &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; - - // Soft fail, we keep the event as an outlier but don't add it to the timeline - warn!("Event was soft failed: {:?}", incoming_pdu); - db.rooms - .mark_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to set soft failed flag".to_owned())?; - return Err("Event has been soft failed".into()); - } - - if incoming_pdu.state_key.is_some() { - info!("Loading current room state ids"); - let current_sstatehash = db - .rooms - .current_shortstatehash(room_id) - .map_err(|_| "Failed to load current state hash.".to_owned())? - .expect("every room has state"); - - let current_state_ids = db - .rooms - .state_full_ids(current_sstatehash) - .await - .map_err(|_| "Failed to load room state.")?; - - info!("Preparing for stateres to derive new room state"); - let mut extremity_sstatehashes = HashMap::new(); - - info!("Loading extremities"); - for id in dbg!(&extremities) { - match db - .rooms - .get_pdu(id) - .map_err(|_| "Failed to ask db for pdu.".to_owned())? - { - Some(leaf_pdu) => { - extremity_sstatehashes.insert( - db.rooms - .pdu_shortstatehash(&leaf_pdu.event_id) - .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? - .ok_or_else(|| { - error!( - "Found extremity pdu with no statehash in db: {:?}", - leaf_pdu - ); - "Found pdu with no statehash in db.".to_owned() - })?, - leaf_pdu, - ); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err("Missing state snapshot.".to_owned()); - } - } - } - - let mut fork_states = Vec::new(); - - // 12. Ensure that the state is derived from the previous current state (i.e. we calculated - // by doing state res where one of the inputs was a previously trusted set of state, - // don't just trust a set of state we got from a remote). - - // We do this by adding the current state to the list of fork states - extremity_sstatehashes.remove(¤t_sstatehash); - fork_states.push(current_state_ids); - - // We also add state after incoming event to the fork states - let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &incoming_pdu.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); - } - fork_states.push(state_after); - - let mut update_state = false; - // 14. Use state resolution to find new room state - let new_room_state = if fork_states.is_empty() { - return Err("State is empty.".to_owned()); - } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { - info!("State resolution trivial"); - // There was only one state, so it has to be the room's current state (because that is - // always included) - fork_states[0] - .iter() - .map(|(k, id)| { - db.rooms - .compress_state_event(*k, id, &db.globals) - .map_err(|_| "Failed to compress_state_event.".to_owned()) - }) - .collect::>()? - } else { - info!("Loading auth chains"); - // We do need to force an update to this room's state - update_state = true; - - let mut auth_chain_sets = Vec::new(); - for state in &fork_states { - auth_chain_sets.push( - get_auth_chain( - room_id, - state.iter().map(|(_, id)| id.clone()).collect(), - db, - ) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), - ); - } - - info!("Loading fork states"); - - let fork_states: Vec<_> = fork_states - .into_iter() - .map(|map| { - map.into_iter() - .filter_map(|(k, id)| { - db.rooms - .get_statekey_from_short(k) - // FIXME: Undo .to_string().into() when StateMap - // is updated to use StateEventType - .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) - .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) - .ok() - }) - .collect::>() - }) - .collect(); - - info!("Resolving state"); - - let lock = db.globals.stateres_mutex.lock(); - let state = match state_res::resolve( - room_version_id, - &fork_states, - auth_chain_sets, - |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { - Ok(new_state) => new_state, - Err(_) => { - return Err("State resolution failed, either an event could not be found or deserialization".into()); - } - }; - - drop(lock); - - info!("State resolution done. Compressing state"); - - state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &event_type.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; - db.rooms - .compress_state_event(shortstatekey, &event_id, &db.globals) - .map_err(|_| "Failed to compress state event".to_owned()) - }) - .collect::>()? - }; - - // Set the new room state to the resolved state - if update_state { - info!("Forcing new room state"); - db.rooms - .force_state(room_id, new_room_state, db) - .map_err(|_| "Failed to set new room state.".to_owned())?; - } - } - - info!("Appending pdu to timeline"); - extremities.insert(incoming_pdu.event_id.clone()); - - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. - - let pdu_id = append_incoming_pdu( - db, - &incoming_pdu, - val, - extremities.iter().map(Deref::deref), - state_ids_compressed, - soft_fail, - &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; - - info!("Appended incoming pdu"); - - // Event has passed all auth/stateres checks - drop(state_lock); - Ok(pdu_id) -} - -/// Find the event and auth it. Once the event is validated (steps 1 - 8) -/// it is appended to the outliers Tree. -/// -/// Returns pdu and if we fetched it over federation the raw json. -/// -/// a. Look in the main timeline (pduid_pdu tree) -/// b. Look at outlier pdu tree -/// c. Ask origin server over federation -/// d. TODO: Ask other servers over federation? -#[tracing::instrument(skip_all)] -pub(crate) fn fetch_and_handle_outliers<'a>( - db: &'a Database, - origin: &'a ServerName, - events: &'a [Arc], - create_event: &'a PduEvent, - room_id: &'a RoomId, - pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveType<'a, Vec<(Arc, Option>)>> { - Box::pin(async move { - let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), - }; - - let mut pdus = vec![]; - for id in events { - if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&**id) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", id); - continue; - } - } - - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { - trace!("Found {} in db", id); - pdus.push((local_pdu, None)); - continue; - } - - // c. Ask origin server over federation - // We also handle its auth chain here so we don't get a stack overflow in - // handle_outlier_pdu. - let mut todo_auth_events = vec![Arc::clone(id)]; - let mut events_in_reverse_order = Vec::new(); - let mut events_all = HashSet::new(); - let mut i = 0; - while let Some(next_id) = todo_auth_events.pop() { - if events_all.contains(&next_id) { - continue; - } - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - - if let Ok(Some(_)) = db.rooms.get_pdu(&next_id) { - trace!("Found {} in db", id); - continue; - } - - info!("Fetching {} over federation.", next_id); - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: &next_id }, - ) - .await - { - Ok(res) => { - info!("Got {} over federation", next_id); - let (calculated_event_id, value) = - match crate::pdu::gen_event_id_canonical_json(&res.pdu, &db) { - Ok(t) => t, - Err(_) => { - back_off((*next_id).to_owned()); - continue; - } - }; - - if calculated_event_id != *next_id { - warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", - next_id, calculated_event_id, &res.pdu); - } - - if let Some(auth_events) = - value.get("auth_events").and_then(|c| c.as_array()) - { - for auth_event in auth_events { - if let Ok(auth_event) = - serde_json::from_value(auth_event.clone().into()) - { - let a: Arc = auth_event; - todo_auth_events.push(a); - } else { - warn!("Auth event id is not valid"); - } - } - } else { - warn!("Auth event list invalid"); - } - - events_in_reverse_order.push((next_id.clone(), value)); - events_all.insert(next_id); - } - Err(_) => { - warn!("Failed to fetch event: {}", next_id); - back_off((*next_id).to_owned()); - } - } - } - - for (next_id, value) in events_in_reverse_order.iter().rev() { - match handle_outlier_pdu( - origin, - create_event, - next_id, - room_id, - value.clone(), - db, - pub_key_map, - ) - .await - { - Ok((pdu, json)) => { - if next_id == id { - pdus.push((pdu, Some(json))); - } - } - Err(e) => { - warn!("Authentication of event {} failed: {:?}", next_id, e); - back_off((**next_id).to_owned()); - } - } - } - } - pdus - }) -} - /// Search the DB for the signing keys of the given server, if we don't have them /// fetch them from the server and save to our DB. #[tracing::instrument(skip_all)] @@ -2204,92 +1047,6 @@ pub(crate) async fn fetch_signing_keys( )) } -/// Append the incoming event setting the state snapshot to the state from the -/// server that sent the event. -#[tracing::instrument(skip_all)] -fn append_incoming_pdu<'a>( - db: &Database, - pdu: &PduEvent, - pdu_json: CanonicalJsonObject, - new_room_leaves: impl IntoIterator + Clone + Debug, - state_ids_compressed: HashSet, - soft_fail: bool, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex -) -> Result>> { - // We append to state before appending the pdu, so we don't have a moment in time with the - // pdu without it's state. This is okay because append_pdu can't fail. - db.rooms.set_event_state( - &pdu.event_id, - &pdu.room_id, - state_ids_compressed, - &db.globals, - )?; - - if soft_fail { - db.rooms - .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; - return Ok(None); - } - - let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; - - for appservice in db.appservice.all()? { - if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - continue; - } - - if let Some(namespaces) = appservice.1.get("namespaces") { - let users = namespaces - .get("users") - .and_then(|users| users.as_sequence()) - .map_or_else(Vec::new, |users| { - users - .iter() - .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) - .collect::>() - }); - let aliases = namespaces - .get("aliases") - .and_then(|aliases| aliases.as_sequence()) - .map_or_else(Vec::new, |aliases| { - aliases - .iter() - .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) - .collect::>() - }); - let rooms = namespaces - .get("rooms") - .and_then(|rooms| rooms.as_sequence()); - - let matching_users = |users: &Regex| { - users.is_match(pdu.sender.as_str()) - || pdu.kind == RoomEventType::RoomMember - && pdu - .state_key - .as_ref() - .map_or(false, |state_key| users.is_match(state_key)) - }; - let matching_aliases = |aliases: &Regex| { - db.rooms - .room_aliases(&pdu.room_id) - .filter_map(|r| r.ok()) - .any(|room_alias| aliases.is_match(room_alias.as_str())) - }; - - if aliases.iter().any(matching_aliases) - || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) - || users.iter().any(matching_users) - { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - } - } - } - - Ok(Some(pdu_id)) -} - #[tracing::instrument(skip(starting_events, db))] pub(crate) async fn get_auth_chain<'a>( room_id: &RoomId, @@ -2745,35 +1502,6 @@ pub async fn create_join_event_template_route( } } - let prev_events: Vec<_> = db - .rooms - .get_pdu_leaves(&body.room_id)? - .into_iter() - .take(20) - .collect(); - - let create_event = db - .rooms - .room_state_get(&body.room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default version - // right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version - }); - let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); - if !body.ver.contains(&room_version_id) { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { @@ -2798,89 +1526,7 @@ pub async fn create_join_event_template_route( let state_key = body.user_id.to_string(); let kind = StateEventType::RoomMember; - let auth_events = db.rooms.get_auth_events( - &body.room_id, - &kind.to_string().into(), - &body.user_id, - Some(&state_key), - &content, - )?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = BTreeMap::new(); - - if let Some(prev_pdu) = db.rooms.room_state_get(&body.room_id, &kind, &state_key)? { - unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); - unsigned.insert( - "prev_sender".to_owned(), - to_raw_value(&prev_pdu.sender).expect("UserId is valid"), - ); - } - - let pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: body.room_id.clone(), - sender: body.user_id.clone(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: kind.to_string().into(), - content, - state_key: Some(state_key), - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts: None, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - CanonicalJsonValue::String(db.globals.server_name().as_str().to_owned()), - ); + let (pdu, pdu_json) = create_hash_and_sign_event(); Ok(prepare_join_event::v1::Response { room_version: Some(room_version_id), diff --git a/src/service/rooms/alias.rs b/src/service/rooms/alias/mod.rs similarity index 100% rename from src/service/rooms/alias.rs rename to src/service/rooms/alias/mod.rs diff --git a/src/service/rooms/directory.rs b/src/service/rooms/directory/mod.rs similarity index 100% rename from src/service/rooms/directory.rs rename to src/service/rooms/directory/mod.rs diff --git a/src/service/rooms/edus.rs b/src/service/rooms/edus/mod.rs similarity index 100% rename from src/service/rooms/edus.rs rename to src/service/rooms/edus/mod.rs diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs new file mode 100644 index 00000000..e59219b2 --- /dev/null +++ b/src/service/rooms/event_handler/mod.rs @@ -0,0 +1,1152 @@ + +/// An async function that can recursively call itself. +type AsyncRecursiveType<'a, T> = Pin + 'a + Send>>; + +/// When receiving an event one needs to: +/// 0. Check the server is in the room +/// 1. Skip the PDU if we already know about it +/// 2. Check signatures, otherwise drop +/// 3. Check content hash, redact if doesn't match +/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not +/// timeline events +/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are +/// also rejected "due to auth events" +/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events +/// 7. Persist this event as an outlier +/// 8. If not timeline event: stop +/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline +/// events +/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities +/// doing all the checks in this list starting at 1. These are not timeline events +/// 11. Check the auth of the event passes based on the state of the event +/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by +/// doing state res where one of the inputs was a previously trusted set of state, don't just +/// trust a set of state we got from a remote) +/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" +/// it +/// 14. Use state resolution to find new room state +// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively +#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] +pub(crate) async fn handle_incoming_pdu<'a>( + origin: &'a ServerName, + event_id: &'a EventId, + room_id: &'a RoomId, + value: BTreeMap, + is_timeline_event: bool, + db: &'a Database, + pub_key_map: &'a RwLock>>, +) -> Result>> { + db.rooms.exists(room_id)?.ok_or(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server"))?; + + db.rooms.is_disabled(room_id)?.ok_or(Error::BadRequest(ErrorKind::Forbidden, "Federation of this room is currently disabled on this server."))?; + + // 1. Skip the PDU if we already have it as a timeline event + if let Some(pdu_id) = db.rooms.get_pdu_id(event_id)? { + return Some(pdu_id.to_vec()); + } + + let create_event = db + .rooms + .room_state_get(room_id, &StateEventType::RoomCreate, "")? + .ok_or_else(|| Error::bad_database("Failed to find create event in db."))?; + + let first_pdu_in_room = db + .rooms + .first_pdu_in_room(room_id)? + .ok_or_else(|| Error::bad_database("Failed to find first pdu in db."))?; + + let (incoming_pdu, val) = handle_outlier_pdu( + origin, + &create_event, + event_id, + room_id, + value, + db, + pub_key_map, + ) + .await?; + + // 8. if not timeline event: stop + if !is_timeline_event { + return Ok(None); + } + + // Skip old events + if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { + return Ok(None); + } + + // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events + let sorted_prev_events = fetch_unknown_prev_events(incoming_pdu.prev_events.clone()); + + let mut errors = 0; + for prev_id in dbg!(sorted) { + // Check for disabled again because it might have changed + db.rooms.is_disabled(room_id)?.ok_or(Error::BadRequest(ErrorKind::Forbidden, "Federation of + this room is currently disabled on this server."))?; + + if let Some((time, tries)) = db + .globals + .bad_event_ratelimiter + .read() + .unwrap() + .get(&*prev_id) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + info!("Backing off from {}", prev_id); + continue; + } + } + + if errors >= 5 { + break; + } + + if let Some((pdu, json)) = eventid_info.remove(&*prev_id) { + // Skip old events + if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { + continue; + } + + let start_time = Instant::now(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); + + if let Err(e) = upgrade_outlier_to_timeline_pdu( + pdu, + json, + &create_event, + origin, + db, + room_id, + pub_key_map, + ) + .await + { + errors += 1; + warn!("Prev event {} failed: {}", prev_id, e); + match db + .globals + .bad_event_ratelimiter + .write() + .unwrap() + .entry((*prev_id).to_owned()) + { + hash_map::Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + hash_map::Entry::Occupied(mut e) => { + *e.get_mut() = (Instant::now(), e.get().1 + 1) + } + } + } + let elapsed = start_time.elapsed(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .remove(&room_id.to_owned()); + warn!( + "Handling prev event {} took {}m{}s", + prev_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); + } + } + + // Done with prev events, now handling the incoming event + + let start_time = Instant::now(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); + let r = upgrade_outlier_to_timeline_pdu( + incoming_pdu, + val, + &create_event, + origin, + db, + room_id, + pub_key_map, + ) + .await; + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .remove(&room_id.to_owned()); + + r +} + +#[tracing::instrument(skip(create_event, value, db, pub_key_map))] +fn handle_outlier_pdu<'a>( + origin: &'a ServerName, + create_event: &'a PduEvent, + event_id: &'a EventId, + room_id: &'a RoomId, + value: BTreeMap, + db: &'a Database, + pub_key_map: &'a RwLock>>, +) -> AsyncRecursiveType<'a, Result<(Arc, BTreeMap), String>> { + Box::pin(async move { + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json + + // We go through all the signatures we see on the value and fetch the corresponding signing + // keys + fetch_required_signing_keys(&value, pub_key_map, db) + .await?; + + // 2. Check signatures, otherwise drop + // 3. check content hash, redact if doesn't match + let create_event_content: RoomCreateEventContent = + serde_json::from_str(create_event.content.get()).map_err(|e| { + error!("Invalid create event: {}", e); + Error::BadDatabase("Invalid create event in db") + })?; + + let room_version_id = &create_event_content.room_version; + let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + + let mut val = match ruma::signatures::verify_event( + &*pub_key_map.read().map_err(|_| "RwLock is poisoned.")?, + &value, + room_version_id, + ) { + Err(e) => { + // Drop + warn!("Dropping bad event {}: {}", event_id, e); + return Err("Signature verification failed".to_owned()); + } + Ok(ruma::signatures::Verified::Signatures) => { + // Redact + warn!("Calculated hash does not match: {}", event_id); + match ruma::signatures::redact(&value, room_version_id) { + Ok(obj) => obj, + Err(_) => return Err("Redaction failed".to_owned()), + } + } + Ok(ruma::signatures::Verified::All) => value, + }; + + // Now that we have checked the signature and hashes we can add the eventID and convert + // to our PduEvent type + val.insert( + "event_id".to_owned(), + CanonicalJsonValue::String(event_id.as_str().to_owned()), + ); + let incoming_pdu = serde_json::from_value::( + serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), + ) + .map_err(|_| "Event is not a valid PDU.".to_owned())?; + + // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events + // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" + // NOTE: Step 5 is not applied anymore because it failed too often + warn!("Fetching auth events for {}", incoming_pdu.event_id); + fetch_and_handle_outliers( + db, + origin, + &incoming_pdu + .auth_events + .iter() + .map(|x| Arc::from(&**x)) + .collect::>(), + create_event, + room_id, + pub_key_map, + ) + .await; + + // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events + info!( + "Auth check for {} based on auth events", + incoming_pdu.event_id + ); + + // Build map of auth events + let mut auth_events = HashMap::new(); + for id in &incoming_pdu.auth_events { + let auth_event = match db.rooms.get_pdu(id)? { + Some(e) => e, + None => { + warn!("Could not find auth event {}", id); + continue; + } + }; + + match auth_events.entry(( + auth_event.kind.to_string().into(), + auth_event + .state_key + .clone() + .expect("all auth events have state keys"), + )) { + hash_map::Entry::Vacant(v) => { + v.insert(auth_event); + } + hash_map::Entry::Occupied(_) => { + return Err(Error::BadRequest(ErrorKind::InvalidParam, + "Auth event's type and state_key combination exists multiple times." + )); + } + } + } + + // The original create event must be in the auth events + if auth_events + .get(&(StateEventType::RoomCreate, "".to_owned())) + .map(|a| a.as_ref()) + != Some(create_event) + { + return Err(Error::BadRequest(ErrorKind::InvalidParam("Incoming event refers to wrong create event."))); + } + + if !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + None::, // TODO: third party invite + |k, s| auth_events.get(&(k.to_string().into(), s.to_owned())), + ) + .map_err(|e| {error!(e); Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed")})? + { + return Err(Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed")); + } + + info!("Validation successful."); + + // 7. Persist the event as an outlier. + db.rooms + .add_pdu_outlier(&incoming_pdu.event_id, &val)?; + + info!("Added pdu as outlier."); + + Ok((Arc::new(incoming_pdu), val)) + }) +} + +#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))] +async fn upgrade_outlier_to_timeline_pdu( + incoming_pdu: Arc, + val: BTreeMap, + create_event: &PduEvent, + origin: &ServerName, + db: &Database, + room_id: &RoomId, + pub_key_map: &RwLock>>, +) -> Result>, String> { + // Skip the PDU if we already have it as a timeline event + if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { + return Ok(Some(pduid)); + } + + if db + .rooms + .is_event_soft_failed(&incoming_pdu.event_id) + .map_err(|_| "Failed to ask db for soft fail".to_owned())? + { + return Err("Event has been soft failed".into()); + } + + info!("Upgrading {} to timeline pdu", incoming_pdu.event_id); + + let create_event_content: RoomCreateEventContent = + serde_json::from_str(create_event.content.get()).map_err(|e| { + warn!("Invalid create event: {}", e); + Error::BadDatabase("Invalid create event in db") + })?; + + let room_version_id = &create_event_content.room_version; + let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + + // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities + // doing all the checks in this list starting at 1. These are not timeline events. + + // TODO: if we know the prev_events of the incoming event we can avoid the request and build + // the state from a known point and resolve if > 1 prev_event + + info!("Requesting state at event"); + let mut state_at_incoming_event = None; + + if incoming_pdu.prev_events.len() == 1 { + let prev_event = &*incoming_pdu.prev_events[0]; + let prev_event_sstatehash = db + .rooms + .pdu_shortstatehash(prev_event) + .map_err(|_| "Failed talking to db".to_owned())?; + + let state = if let Some(shortstatehash) = prev_event_sstatehash { + Some(db.rooms.state_full_ids(shortstatehash).await) + } else { + None + }; + + if let Some(Ok(mut state)) = state { + info!("Using cached state"); + let prev_pdu = + db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { + "Could not find prev event, but we know the state.".to_owned() + })?; + + if let Some(state_key) = &prev_pdu.state_key { + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &prev_pdu.kind.to_string().into(), + state_key, + &db.globals, + ) + .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + + state.insert(shortstatekey, Arc::from(prev_event)); + // Now it's the state after the pdu + } + + state_at_incoming_event = Some(state); + } + } else { + info!("Calculating state at event using state res"); + let mut extremity_sstatehashes = HashMap::new(); + + let mut okay = true; + for prev_eventid in &incoming_pdu.prev_events { + let prev_event = if let Ok(Some(pdu)) = db.rooms.get_pdu(prev_eventid) { + pdu + } else { + okay = false; + break; + }; + + let sstatehash = if let Ok(Some(s)) = db.rooms.pdu_shortstatehash(prev_eventid) { + s + } else { + okay = false; + break; + }; + + extremity_sstatehashes.insert(sstatehash, prev_event); + } + + if okay { + let mut fork_states = Vec::with_capacity(extremity_sstatehashes.len()); + let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); + + for (sstatehash, prev_event) in extremity_sstatehashes { + let mut leaf_state: BTreeMap<_, _> = db + .rooms + .state_full_ids(sstatehash) + .await + .map_err(|_| "Failed to ask db for room state.".to_owned())?; + + if let Some(state_key) = &prev_event.state_key { + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &prev_event.kind.to_string().into(), + state_key, + &db.globals, + ) + .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); + // Now it's the state after the pdu + } + + let mut state = StateMap::with_capacity(leaf_state.len()); + let mut starting_events = Vec::with_capacity(leaf_state.len()); + + for (k, id) in leaf_state { + if let Ok((ty, st_key)) = db.rooms.get_statekey_from_short(k) { + // FIXME: Undo .to_string().into() when StateMap + // is updated to use StateEventType + state.insert((ty.to_string().into(), st_key), id.clone()); + } else { + warn!("Failed to get_statekey_from_short."); + } + starting_events.push(id); + } + + auth_chain_sets.push( + get_auth_chain(room_id, starting_events, db) + .await + .map_err(|_| "Failed to load auth chain.".to_owned())? + .collect(), + ); + + fork_states.push(state); + } + + let lock = db.globals.stateres_mutex.lock(); + + let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { + let res = db.rooms.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); + } + res.ok().flatten() + }); + drop(lock); + + state_at_incoming_event = match result { + Ok(new_state) => Some( + new_state + .into_iter() + .map(|((event_type, state_key), event_id)| { + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &event_type.to_string().into(), + &state_key, + &db.globals, + ) + .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; + Ok((shortstatekey, event_id)) + }) + .collect::>()?, + ), + Err(e) => { + warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); + None + } + } + } + } + + if state_at_incoming_event.is_none() { + info!("Calling /state_ids"); + // Call /state_ids to find out what the state at this pdu is. We trust the server's + // response to some extend, but we still do a lot of checks on the events + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_room_state_ids::v1::Request { + room_id, + event_id: &incoming_pdu.event_id, + }, + ) + .await + { + Ok(res) => { + info!("Fetching state events at event."); + let state_vec = fetch_and_handle_outliers( + db, + origin, + &res.pdu_ids + .iter() + .map(|x| Arc::from(&**x)) + .collect::>(), + create_event, + room_id, + pub_key_map, + ) + .await; + + let mut state: BTreeMap<_, Arc> = BTreeMap::new(); + for (pdu, _) in state_vec { + let state_key = pdu + .state_key + .clone() + .ok_or_else(|| "Found non-state pdu in state events.".to_owned())?; + + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &pdu.kind.to_string().into(), + &state_key, + &db.globals, + ) + .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + + match state.entry(shortstatekey) { + btree_map::Entry::Vacant(v) => { + v.insert(Arc::from(&*pdu.event_id)); + } + btree_map::Entry::Occupied(_) => return Err( + "State event's type and state_key combination exists multiple times." + .to_owned(), + ), + } + } + + // The original create event must still be in the state + let create_shortstatekey = db + .rooms + .get_shortstatekey(&StateEventType::RoomCreate, "") + .map_err(|_| "Failed to talk to db.")? + .expect("Room exists"); + + if state.get(&create_shortstatekey).map(|id| id.as_ref()) + != Some(&create_event.event_id) + { + return Err("Incoming event refers to wrong create event.".to_owned()); + } + + state_at_incoming_event = Some(state); + } + Err(e) => { + warn!("Fetching state for event failed: {}", e); + return Err("Fetching state for event failed".into()); + } + }; + } + + let state_at_incoming_event = + state_at_incoming_event.expect("we always set this to some above"); + + info!("Starting auth check"); + // 11. Check the auth of the event passes based on the state of the event + let check_result = state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + None::, // TODO: third party invite + |k, s| { + db.rooms + .get_shortstatekey(&k.to_string().into(), s) + .ok() + .flatten() + .and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey)) + .and_then(|event_id| db.rooms.get_pdu(event_id).ok().flatten()) + }, + ) + .map_err(|_e| "Auth check failed.".to_owned())?; + + if !check_result { + return Err("Event has failed auth check with state at the event.".into()); + } + info!("Auth check succeeded"); + + // We start looking at current room state now, so lets lock the room + + let mutex_state = Arc::clone( + db.globals + .roomid_mutex_state + .write() + .unwrap() + .entry(room_id.to_owned()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; + + // Now we calculate the set of extremities this room has after the incoming event has been + // applied. We start with the previous extremities (aka leaves) + info!("Calculating extremities"); + let mut extremities = db + .rooms + .get_pdu_leaves(room_id) + .map_err(|_| "Failed to load room leaves".to_owned())?; + + // Remove any forward extremities that are referenced by this incoming event's prev_events + for prev_event in &incoming_pdu.prev_events { + if extremities.contains(prev_event) { + extremities.remove(prev_event); + } + } + + // Only keep those extremities were not referenced yet + extremities.retain(|id| !matches!(db.rooms.is_event_referenced(room_id, id), Ok(true))); + + info!("Compressing state at event"); + let state_ids_compressed = state_at_incoming_event + .iter() + .map(|(shortstatekey, id)| { + db.rooms + .compress_state_event(*shortstatekey, id, &db.globals) + .map_err(|_| "Failed to compress_state_event".to_owned()) + }) + .collect::>()?; + + // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it + info!("Starting soft fail auth check"); + + let auth_events = db + .rooms + .get_auth_events( + room_id, + &incoming_pdu.kind, + &incoming_pdu.sender, + incoming_pdu.state_key.as_deref(), + &incoming_pdu.content, + ) + .map_err(|_| "Failed to get_auth_events.".to_owned())?; + + let soft_fail = !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + None::, + |k, s| auth_events.get(&(k.clone(), s.to_owned())), + ) + .map_err(|_e| "Auth check failed.".to_owned())?; + + if soft_fail { + append_incoming_pdu( + db, + &incoming_pdu, + val, + extremities.iter().map(Deref::deref), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .map_err(|e| { + warn!("Failed to add pdu to db: {}", e); + "Failed to add pdu to db.".to_owned() + })?; + + // Soft fail, we keep the event as an outlier but don't add it to the timeline + warn!("Event was soft failed: {:?}", incoming_pdu); + db.rooms + .mark_event_soft_failed(&incoming_pdu.event_id) + .map_err(|_| "Failed to set soft failed flag".to_owned())?; + return Err("Event has been soft failed".into()); + } + + if incoming_pdu.state_key.is_some() { + info!("Loading current room state ids"); + let current_sstatehash = db + .rooms + .current_shortstatehash(room_id) + .map_err(|_| "Failed to load current state hash.".to_owned())? + .expect("every room has state"); + + let current_state_ids = db + .rooms + .state_full_ids(current_sstatehash) + .await + .map_err(|_| "Failed to load room state.")?; + + info!("Preparing for stateres to derive new room state"); + let mut extremity_sstatehashes = HashMap::new(); + + info!("Loading extremities"); + for id in dbg!(&extremities) { + match db + .rooms + .get_pdu(id) + .map_err(|_| "Failed to ask db for pdu.".to_owned())? + { + Some(leaf_pdu) => { + extremity_sstatehashes.insert( + db.rooms + .pdu_shortstatehash(&leaf_pdu.event_id) + .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? + .ok_or_else(|| { + error!( + "Found extremity pdu with no statehash in db: {:?}", + leaf_pdu + ); + "Found pdu with no statehash in db.".to_owned() + })?, + leaf_pdu, + ); + } + _ => { + error!("Missing state snapshot for {:?}", id); + return Err("Missing state snapshot.".to_owned()); + } + } + } + + let mut fork_states = Vec::new(); + + // 12. Ensure that the state is derived from the previous current state (i.e. we calculated + // by doing state res where one of the inputs was a previously trusted set of state, + // don't just trust a set of state we got from a remote). + + // We do this by adding the current state to the list of fork states + extremity_sstatehashes.remove(¤t_sstatehash); + fork_states.push(current_state_ids); + + // We also add state after incoming event to the fork states + let mut state_after = state_at_incoming_event.clone(); + if let Some(state_key) = &incoming_pdu.state_key { + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &incoming_pdu.kind.to_string().into(), + state_key, + &db.globals, + ) + .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + + state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); + } + fork_states.push(state_after); + + let mut update_state = false; + // 14. Use state resolution to find new room state + let new_room_state = if fork_states.is_empty() { + return Err("State is empty.".to_owned()); + } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { + info!("State resolution trivial"); + // There was only one state, so it has to be the room's current state (because that is + // always included) + fork_states[0] + .iter() + .map(|(k, id)| { + db.rooms + .compress_state_event(*k, id, &db.globals) + .map_err(|_| "Failed to compress_state_event.".to_owned()) + }) + .collect::>()? + } else { + info!("Loading auth chains"); + // We do need to force an update to this room's state + update_state = true; + + let mut auth_chain_sets = Vec::new(); + for state in &fork_states { + auth_chain_sets.push( + get_auth_chain( + room_id, + state.iter().map(|(_, id)| id.clone()).collect(), + db, + ) + .await + .map_err(|_| "Failed to load auth chain.".to_owned())? + .collect(), + ); + } + + info!("Loading fork states"); + + let fork_states: Vec<_> = fork_states + .into_iter() + .map(|map| { + map.into_iter() + .filter_map(|(k, id)| { + db.rooms + .get_statekey_from_short(k) + // FIXME: Undo .to_string().into() when StateMap + // is updated to use StateEventType + .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) + .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) + .ok() + }) + .collect::>() + }) + .collect(); + + info!("Resolving state"); + + let lock = db.globals.stateres_mutex.lock(); + let state = match state_res::resolve( + room_version_id, + &fork_states, + auth_chain_sets, + |id| { + let res = db.rooms.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); + } + res.ok().flatten() + }, + ) { + Ok(new_state) => new_state, + Err(_) => { + return Err("State resolution failed, either an event could not be found or deserialization".into()); + } + }; + + drop(lock); + + info!("State resolution done. Compressing state"); + + state + .into_iter() + .map(|((event_type, state_key), event_id)| { + let shortstatekey = db + .rooms + .get_or_create_shortstatekey( + &event_type.to_string().into(), + &state_key, + &db.globals, + ) + .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; + db.rooms + .compress_state_event(shortstatekey, &event_id, &db.globals) + .map_err(|_| "Failed to compress state event".to_owned()) + }) + .collect::>()? + }; + + // Set the new room state to the resolved state + if update_state { + info!("Forcing new room state"); + db.rooms + .force_state(room_id, new_room_state, db) + .map_err(|_| "Failed to set new room state.".to_owned())?; + } + } + + info!("Appending pdu to timeline"); + extremities.insert(incoming_pdu.event_id.clone()); + + // Now that the event has passed all auth it is added into the timeline. + // We use the `state_at_event` instead of `state_after` so we accurately + // represent the state for this event. + + let pdu_id = append_incoming_pdu( + db, + &incoming_pdu, + val, + extremities.iter().map(Deref::deref), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .map_err(|e| { + warn!("Failed to add pdu to db: {}", e); + "Failed to add pdu to db.".to_owned() + })?; + + info!("Appended incoming pdu"); + + // Event has passed all auth/stateres checks + drop(state_lock); + Ok(pdu_id) +} + +/// Find the event and auth it. Once the event is validated (steps 1 - 8) +/// it is appended to the outliers Tree. +/// +/// Returns pdu and if we fetched it over federation the raw json. +/// +/// a. Look in the main timeline (pduid_pdu tree) +/// b. Look at outlier pdu tree +/// c. Ask origin server over federation +/// d. TODO: Ask other servers over federation? +#[tracing::instrument(skip_all)] +pub(crate) fn fetch_and_handle_outliers<'a>( + db: &'a Database, + origin: &'a ServerName, + events: &'a [Arc], + create_event: &'a PduEvent, + room_id: &'a RoomId, + pub_key_map: &'a RwLock>>, +) -> AsyncRecursiveType<'a, Vec<(Arc, Option>)>> { + Box::pin(async move { + let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { + hash_map::Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), + }; + + let mut pdus = vec![]; + for id in events { + if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&**id) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + info!("Backing off from {}", id); + continue; + } + } + + // a. Look in the main timeline (pduid_pdu tree) + // b. Look at outlier pdu tree + // (get_pdu_json checks both) + if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { + trace!("Found {} in db", id); + pdus.push((local_pdu, None)); + continue; + } + + // c. Ask origin server over federation + // We also handle its auth chain here so we don't get a stack overflow in + // handle_outlier_pdu. + let mut todo_auth_events = vec![Arc::clone(id)]; + let mut events_in_reverse_order = Vec::new(); + let mut events_all = HashSet::new(); + let mut i = 0; + while let Some(next_id) = todo_auth_events.pop() { + if events_all.contains(&next_id) { + continue; + } + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + + if let Ok(Some(_)) = db.rooms.get_pdu(&next_id) { + trace!("Found {} in db", id); + continue; + } + + info!("Fetching {} over federation.", next_id); + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_event::v1::Request { event_id: &next_id }, + ) + .await + { + Ok(res) => { + info!("Got {} over federation", next_id); + let (calculated_event_id, value) = + match crate::pdu::gen_event_id_canonical_json(&res.pdu, &db) { + Ok(t) => t, + Err(_) => { + back_off((*next_id).to_owned()); + continue; + } + }; + + if calculated_event_id != *next_id { + warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", + next_id, calculated_event_id, &res.pdu); + } + + if let Some(auth_events) = + value.get("auth_events").and_then(|c| c.as_array()) + { + for auth_event in auth_events { + if let Ok(auth_event) = + serde_json::from_value(auth_event.clone().into()) + { + let a: Arc = auth_event; + todo_auth_events.push(a); + } else { + warn!("Auth event id is not valid"); + } + } + } else { + warn!("Auth event list invalid"); + } + + events_in_reverse_order.push((next_id.clone(), value)); + events_all.insert(next_id); + } + Err(_) => { + warn!("Failed to fetch event: {}", next_id); + back_off((*next_id).to_owned()); + } + } + } + + for (next_id, value) in events_in_reverse_order.iter().rev() { + match handle_outlier_pdu( + origin, + create_event, + next_id, + room_id, + value.clone(), + db, + pub_key_map, + ) + .await + { + Ok((pdu, json)) => { + if next_id == id { + pdus.push((pdu, Some(json))); + } + } + Err(e) => { + warn!("Authentication of event {} failed: {:?}", next_id, e); + back_off((**next_id).to_owned()); + } + } + } + } + pdus + }) +} + + + +fn fetch_unknown_prev_events(initial_set: Vec>) -> Vec> { + let mut graph: HashMap, _> = HashMap::new(); + let mut eventid_info = HashMap::new(); + let mut todo_outlier_stack: Vec> = initial_set; + + let mut amount = 0; + + while let Some(prev_event_id) = todo_outlier_stack.pop() { + if let Some((pdu, json_opt)) = fetch_and_handle_outliers( + db, + origin, + &[prev_event_id.clone()], + &create_event, + room_id, + pub_key_map, + ) + .await + .pop() + { + if amount > 100 { + // Max limit reached + warn!("Max prev event limit reached!"); + graph.insert(prev_event_id.clone(), HashSet::new()); + continue; + } + + if let Some(json) = + json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) + { + if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts { + amount += 1; + for prev_prev in &pdu.prev_events { + if !graph.contains_key(prev_prev) { + todo_outlier_stack.push(dbg!(prev_prev.clone())); + } + } + + graph.insert( + prev_event_id.clone(), + pdu.prev_events.iter().cloned().collect(), + ); + } else { + // Time based check failed + graph.insert(prev_event_id.clone(), HashSet::new()); + } + + eventid_info.insert(prev_event_id.clone(), (pdu, json)); + } else { + // Get json failed, so this was not fetched over federation + graph.insert(prev_event_id.clone(), HashSet::new()); + } + } else { + // Fetch and handle failed + graph.insert(prev_event_id.clone(), HashSet::new()); + } + } + + let sorted = state_res::lexicographical_topological_sort(dbg!(&graph), |event_id| { + // This return value is the key used for sorting events, + // events are then sorted by power level, time, + // and lexically by event_id. + println!("{}", event_id); + Ok(( + int!(0), + MilliSecondsSinceUnixEpoch( + eventid_info + .get(event_id) + .map_or_else(|| uint!(0), |info| info.0.origin_server_ts), + ), + )) + }) + .map_err(|_| "Error sorting prev events".to_owned())?; + + sorted +} diff --git a/src/service/rooms/lazy_loading.rs b/src/service/rooms/lazy_loading/mod.rs similarity index 100% rename from src/service/rooms/lazy_loading.rs rename to src/service/rooms/lazy_loading/mod.rs diff --git a/src/service/rooms/metadata.rs b/src/service/rooms/metadata/mod.rs similarity index 100% rename from src/service/rooms/metadata.rs rename to src/service/rooms/metadata/mod.rs diff --git a/src/service/rooms/outlier.rs b/src/service/rooms/outlier/mod.rs similarity index 65% rename from src/service/rooms/outlier.rs rename to src/service/rooms/outlier/mod.rs index afb0a147..340e93e4 100644 --- a/src/service/rooms/outlier.rs +++ b/src/service/rooms/outlier/mod.rs @@ -1,3 +1,12 @@ + /// Returns the pdu from the outlier tree. + pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result> { + self.eventid_outlierpdu + .get(event_id.as_bytes())? + .map_or(Ok(None), |pdu| { + serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + } + /// Returns the pdu from the outlier tree. pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result> { self.eventid_outlierpdu @@ -8,8 +17,6 @@ } /// Append the PDU as an outlier. - /// - /// Any event given to this will be processed (state-res) on another thread. #[tracing::instrument(skip(self, pdu))] pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> { self.eventid_outlierpdu.insert( diff --git a/src/service/rooms/pdu_metadata.rs b/src/service/rooms/pdu_metadata/mod.rs similarity index 100% rename from src/service/rooms/pdu_metadata.rs rename to src/service/rooms/pdu_metadata/mod.rs diff --git a/src/service/rooms/search.rs b/src/service/rooms/search/mod.rs similarity index 100% rename from src/service/rooms/search.rs rename to src/service/rooms/search/mod.rs diff --git a/src/service/rooms/short.rs b/src/service/rooms/short/mod.rs similarity index 100% rename from src/service/rooms/short.rs rename to src/service/rooms/short/mod.rs diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs new file mode 100644 index 00000000..4b42ca8e --- /dev/null +++ b/src/service/rooms/state/data.rs @@ -0,0 +1,16 @@ +pub trait Data { + fn get_room_shortstatehash(room_id: &RoomId); +} + + /// Returns the last state hash key added to the db for the given room. + #[tracing::instrument(skip(self))] + pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result> { + self.roomid_shortstatehash + .get(room_id.as_bytes())? + .map_or(Ok(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid shortstatehash in roomid_shortstatehash") + })?)) + }) + } + diff --git a/src/service/rooms/state.rs b/src/service/rooms/state/mod.rs similarity index 65% rename from src/service/rooms/state.rs rename to src/service/rooms/state/mod.rs index 4c75467f..eddfe9e0 100644 --- a/src/service/rooms/state.rs +++ b/src/service/rooms/state/mod.rs @@ -1,133 +1,8 @@ +pub struct Service { + db: D, +} - /// Builds a StateMap by iterating over all keys that start - /// with state_hash, this gives the full state for the given state_hash. - #[tracing::instrument(skip(self))] - pub async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { - let full_state = self - .load_shortstatehash_info(shortstatehash)? - .pop() - .expect("there is always one layer") - .1; - let mut result = BTreeMap::new(); - let mut i = 0; - for compressed in full_state.into_iter() { - let parsed = self.parse_compressed_state_event(compressed)?; - result.insert(parsed.0, parsed.1); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - Ok(result) - } - - #[tracing::instrument(skip(self))] - pub async fn state_full( - &self, - shortstatehash: u64, - ) -> Result>> { - let full_state = self - .load_shortstatehash_info(shortstatehash)? - .pop() - .expect("there is always one layer") - .1; - - let mut result = HashMap::new(); - let mut i = 0; - for compressed in full_state { - let (_, eventid) = self.parse_compressed_state_event(compressed)?; - if let Some(pdu) = self.get_pdu(&eventid)? { - result.insert( - ( - pdu.kind.to_string().into(), - pdu.state_key - .as_ref() - .ok_or_else(|| Error::bad_database("State event has no state key."))? - .clone(), - ), - pdu, - ); - } - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - - Ok(result) - } - - /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] - pub fn state_get_id( - &self, - shortstatehash: u64, - event_type: &StateEventType, - state_key: &str, - ) -> Result>> { - let shortstatekey = match self.get_shortstatekey(event_type, state_key)? { - Some(s) => s, - None => return Ok(None), - }; - let full_state = self - .load_shortstatehash_info(shortstatehash)? - .pop() - .expect("there is always one layer") - .1; - Ok(full_state - .into_iter() - .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) - .and_then(|compressed| { - self.parse_compressed_state_event(compressed) - .ok() - .map(|(_, id)| id) - })) - } - - /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] - pub fn state_get( - &self, - shortstatehash: u64, - event_type: &StateEventType, - state_key: &str, - ) -> Result>> { - self.state_get_id(shortstatehash, event_type, state_key)? - .map_or(Ok(None), |event_id| self.get_pdu(&event_id)) - } - - /// Returns the state hash for this pdu. - pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { - self.eventid_shorteventid - .get(event_id.as_bytes())? - .map_or(Ok(None), |shorteventid| { - self.shorteventid_shortstatehash - .get(&shorteventid)? - .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database( - "Invalid shortstatehash bytes in shorteventid_shortstatehash", - ) - }) - }) - .transpose() - }) - } - - /// Returns the last state hash key added to the db for the given room. - #[tracing::instrument(skip(self))] - pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result> { - self.roomid_shortstatehash - .get(room_id.as_bytes())? - .map_or(Ok(None), |bytes| { - Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid shortstatehash in roomid_shortstatehash") - })?)) - }) - } - +impl Service { /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot. @@ -138,7 +13,7 @@ new_state_ids_compressed: HashSet, db: &Database, ) -> Result<()> { - let previous_shortstatehash = self.current_shortstatehash(room_id)?; + let previous_shortstatehash = self.d.current_shortstatehash(room_id)?; let state_hash = self.calculate_hash( &new_state_ids_compressed @@ -237,49 +112,6 @@ Ok(()) } - /// Returns the full room state. - #[tracing::instrument(skip(self))] - pub async fn room_state_full( - &self, - room_id: &RoomId, - ) -> Result>> { - if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { - self.state_full(current_shortstatehash).await - } else { - Ok(HashMap::new()) - } - } - - /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] - pub fn room_state_get_id( - &self, - room_id: &RoomId, - event_type: &StateEventType, - state_key: &str, - ) -> Result>> { - if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { - self.state_get_id(current_shortstatehash, event_type, state_key) - } else { - Ok(None) - } - } - - /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] - pub fn room_state_get( - &self, - room_id: &RoomId, - event_type: &StateEventType, - state_key: &str, - ) -> Result>> { - if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { - self.state_get(current_shortstatehash, event_type, state_key) - } else { - Ok(None) - } - } - /// Returns the leaf pdus of a room. #[tracing::instrument(skip(self))] pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result>> { @@ -507,3 +339,4 @@ Ok(()) } +} diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs new file mode 100644 index 00000000..ae26a7c4 --- /dev/null +++ b/src/service/rooms/state_accessor/mod.rs @@ -0,0 +1,160 @@ + /// Builds a StateMap by iterating over all keys that start + /// with state_hash, this gives the full state for the given state_hash. + #[tracing::instrument(skip(self))] + pub async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { + let full_state = self + .load_shortstatehash_info(shortstatehash)? + .pop() + .expect("there is always one layer") + .1; + let mut result = BTreeMap::new(); + let mut i = 0; + for compressed in full_state.into_iter() { + let parsed = self.parse_compressed_state_event(compressed)?; + result.insert(parsed.0, parsed.1); + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + } + Ok(result) + } + + #[tracing::instrument(skip(self))] + pub async fn state_full( + &self, + shortstatehash: u64, + ) -> Result>> { + let full_state = self + .load_shortstatehash_info(shortstatehash)? + .pop() + .expect("there is always one layer") + .1; + + let mut result = HashMap::new(); + let mut i = 0; + for compressed in full_state { + let (_, eventid) = self.parse_compressed_state_event(compressed)?; + if let Some(pdu) = self.get_pdu(&eventid)? { + result.insert( + ( + pdu.kind.to_string().into(), + pdu.state_key + .as_ref() + .ok_or_else(|| Error::bad_database("State event has no state key."))? + .clone(), + ), + pdu, + ); + } + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + } + + Ok(result) + } + + /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). + #[tracing::instrument(skip(self))] + pub fn state_get_id( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result>> { + let shortstatekey = match self.get_shortstatekey(event_type, state_key)? { + Some(s) => s, + None => return Ok(None), + }; + let full_state = self + .load_shortstatehash_info(shortstatehash)? + .pop() + .expect("there is always one layer") + .1; + Ok(full_state + .into_iter() + .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) + .and_then(|compressed| { + self.parse_compressed_state_event(compressed) + .ok() + .map(|(_, id)| id) + })) + } + + /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). + #[tracing::instrument(skip(self))] + pub fn state_get( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result>> { + self.state_get_id(shortstatehash, event_type, state_key)? + .map_or(Ok(None), |event_id| self.get_pdu(&event_id)) + } + + /// Returns the state hash for this pdu. + pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { + self.eventid_shorteventid + .get(event_id.as_bytes())? + .map_or(Ok(None), |shorteventid| { + self.shorteventid_shortstatehash + .get(&shorteventid)? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "Invalid shortstatehash bytes in shorteventid_shortstatehash", + ) + }) + }) + .transpose() + }) + } + + /// Returns the full room state. + #[tracing::instrument(skip(self))] + pub async fn room_state_full( + &self, + room_id: &RoomId, + ) -> Result>> { + if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { + self.state_full(current_shortstatehash).await + } else { + Ok(HashMap::new()) + } + } + + /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). + #[tracing::instrument(skip(self))] + pub fn room_state_get_id( + &self, + room_id: &RoomId, + event_type: &StateEventType, + state_key: &str, + ) -> Result>> { + if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { + self.state_get_id(current_shortstatehash, event_type, state_key) + } else { + Ok(None) + } + } + + /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). + #[tracing::instrument(skip(self))] + pub fn room_state_get( + &self, + room_id: &RoomId, + event_type: &StateEventType, + state_key: &str, + ) -> Result>> { + if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { + self.state_get(current_shortstatehash, event_type, state_key) + } else { + Ok(None) + } + } + diff --git a/src/service/rooms/state_cache.rs b/src/service/rooms/state_cache/mod.rs similarity index 100% rename from src/service/rooms/state_cache.rs rename to src/service/rooms/state_cache/mod.rs diff --git a/src/service/rooms/state_compressor.rs b/src/service/rooms/state_compressor/mod.rs similarity index 100% rename from src/service/rooms/state_compressor.rs rename to src/service/rooms/state_compressor/mod.rs diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline/mod.rs similarity index 93% rename from src/service/rooms/timeline.rs rename to src/service/rooms/timeline/mod.rs index fd93344c..6299b16c 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline/mod.rs @@ -100,16 +100,6 @@ .transpose() } - /// Returns the json of a pdu. - pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result> { - self.eventid_outlierpdu - .get(event_id.as_bytes())? - .map(|pdu| { - serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .transpose() - } - /// Returns the json of a pdu. pub fn get_non_outlier_pdu_json( &self, @@ -487,211 +477,6 @@ _ => {} } - Ok(pdu_id) - } - - /// Creates a new persisted data unit and adds it to a room. - #[tracing::instrument(skip(self, db, _mutex_lock))] - pub fn build_and_append_pdu( - &self, - pdu_builder: PduBuilder, - sender: &UserId, - room_id: &RoomId, - db: &Database, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex - ) -> Result> { - let PduBuilder { - event_type, - content, - unsigned, - state_key, - redacts, - } = pdu_builder; - - let prev_events = self - .get_pdu_leaves(room_id)? - .into_iter() - .take(20) - .collect::>(); - - let create_event = self.room_state_get(room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default - // version right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version - }); - let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); - - let auth_events = - self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(self.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = unsigned.unwrap_or_default(); - if let Some(state_key) = &state_key { - if let Some(prev_pdu) = - self.room_state_get(room_id, &event_type.to_string().into(), state_key)? - { - unsigned.insert( - "prev_content".to_owned(), - serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"), - ); - unsigned.insert( - "prev_sender".to_owned(), - serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), - ); - } - } - - let mut pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: room_id.to_owned(), - sender: sender.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: event_type, - content, - state_key, - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - CanonicalJsonValue::String(db.globals.server_name().as_ref().to_owned()), - ); - - match ruma::signatures::hash_and_sign_event( - db.globals.server_name().as_str(), - db.globals.keypair(), - &mut pdu_json, - &room_version_id, - ) { - Ok(_) => {} - Err(e) => { - return match e { - ruma::signatures::Error::PduSize => Err(Error::BadRequest( - ErrorKind::TooLarge, - "Message is too long", - )), - _ => Err(Error::BadRequest( - ErrorKind::Unknown, - "Signing event failed", - )), - } - } - } - - // Generate event id - pdu.event_id = EventId::parse_arc(format!( - "${}", - ruma::signatures::reference_hash(&pdu_json, &room_version_id) - .expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); - - pdu_json.insert( - "event_id".to_owned(), - CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()), - ); - - // Generate short event id - let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?; - - // We append to state before appending the pdu, so we don't have a moment in time with the - // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = self.append_to_state(&pdu, &db.globals)?; - - let pdu_id = self.append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - iter::once(&*pdu.event_id), - db, - )?; - - // We set the room state after inserting the pdu, so that we never have a moment in time - // where events in the current room state do not exist - self.set_room_state(room_id, statehashid)?; - - let mut servers: HashSet> = - self.room_servers(room_id).filter_map(|r| r.ok()).collect(); - - // In case we are kicking or banning a user, we need to inform their server of the change - if pdu.kind == RoomEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - servers.insert(Box::from(state_key_uid.server_name())); - } - } - - // Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above - servers.remove(db.globals.server_name()); - - db.sending.send_pdu(servers.into_iter(), &pdu_id)?; - for appservice in db.appservice.all()? { if self.appservice_in_room(room_id, &appservice, db)? { db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; @@ -768,9 +553,268 @@ } } + + Ok(pdu_id) + } + + pub fn create_hash_and_sign_event( + &self, + pdu_builder: PduBuilder, + sender: &UserId, + room_id: &RoomId, + db: &Database, + _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + ) -> (PduEvent, CanonicalJsonObj) { + let PduBuilder { + event_type, + content, + unsigned, + state_key, + redacts, + } = pdu_builder; + + let prev_events: Vec<_> = db + .rooms + .get_pdu_leaves(room_id)? + .into_iter() + .take(20) + .collect(); + + let create_event = db + .rooms + .room_state_get(room_id, &StateEventType::RoomCreate, "")?; + + let create_event_content: Option = create_event + .as_ref() + .map(|create_event| { + serde_json::from_str(create_event.content.get()).map_err(|e| { + warn!("Invalid create event: {}", e); + Error::bad_database("Invalid create event in db.") + }) + }) + .transpose()?; + + // If there was no create event yet, assume we are creating a room with the default + // version right now + let room_version_id = create_event_content + .map_or(db.globals.default_room_version(), |create_event| { + create_event.room_version + }); + let room_version = + RoomVersion::new(&room_version_id).expect("room version is supported"); + + let auth_events = + self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?; + + // Our depth is the maximum depth of prev_events + 1 + let depth = prev_events + .iter() + .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) + .max() + .unwrap_or_else(|| uint!(0)) + + uint!(1); + + let mut unsigned = unsigned.unwrap_or_default(); + + if let Some(state_key) = &state_key { + if let Some(prev_pdu) = + self.room_state_get(room_id, &event_type.to_string().into(), state_key)? + { + unsigned.insert( + "prev_content".to_owned(), + serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"), + ); + unsigned.insert( + "prev_sender".to_owned(), + serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), + ); + } + } + + let pdu = PduEvent { + event_id: ruma::event_id!("$thiswillbefilledinlater").into(), + room_id: room_id.to_owned(), + sender: sender_user.to_owned(), + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"), + kind: event_type, + content, + state_key, + prev_events, + depth, + auth_events: auth_events + .iter() + .map(|(_, pdu)| pdu.event_id.clone()) + .collect(), + redacts, + unsigned: if unsigned.is_empty() { + None + } else { + Some(to_raw_value(&unsigned).expect("to_raw_value always works")) + }, + hashes: EventHash { + sha256: "aaa".to_owned(), + }, + signatures: None, + }; + + let auth_check = state_res::auth_check( + &room_version, + &pdu, + None::, // TODO: third_party_invite + |k, s| auth_events.get(&(k.clone(), s.to_owned())), + ) + .map_err(|e| { + error!("{:?}", e); + Error::bad_database("Auth check failed.") + })?; + + if !auth_check { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Event is not authorized.", + )); + } + + // Hash and sign + let mut pdu_json = + utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); + + pdu_json.remove("event_id"); + + // Add origin because synapse likes that (and it's required in the spec) + pdu_json.insert( + "origin".to_owned(), + to_canonical_value(db.globals.server_name()) + .expect("server name is a valid CanonicalJsonValue"), + ); + + match ruma::signatures::hash_and_sign_event( + db.globals.server_name().as_str(), + db.globals.keypair(), + &mut pdu_json, + &room_version_id, + ) { + Ok(_) => {} + Err(e) => { + return match e { + ruma::signatures::Error::PduSize => Err(Error::BadRequest( + ErrorKind::TooLarge, + "Message is too long", + )), + _ => Err(Error::BadRequest( + ErrorKind::Unknown, + "Signing event failed", + )), + } + } + } + + // Generate event id + pdu.event_id = EventId::parse_arc(format!( + "${}", + ruma::signatures::reference_hash(&pdu_json, &room_version_id) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + + pdu_json.insert( + "event_id".to_owned(), + CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()), + ); + + // Generate short event id + let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?; + } + + /// Creates a new persisted data unit and adds it to a room. This function takes a + /// roomid_mutex_state, meaning that only this function is able to mutate the room state. + #[tracing::instrument(skip(self, db, _mutex_lock))] + pub fn build_and_append_pdu( + &self, + pdu_builder: PduBuilder, + sender: &UserId, + room_id: &RoomId, + db: &Database, + _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + ) -> Result> { + + let (pdu, pdu_json) = create_hash_and_sign_event()?; + + + // We append to state before appending the pdu, so we don't have a moment in time with the + // pdu without it's state. This is okay because append_pdu can't fail. + let statehashid = self.append_to_state(&pdu, &db.globals)?; + + let pdu_id = self.append_pdu( + &pdu, + pdu_json, + // Since this PDU references all pdu_leaves we can update the leaves + // of the room + iter::once(&*pdu.event_id), + db, + )?; + + // We set the room state after inserting the pdu, so that we never have a moment in time + // where events in the current room state do not exist + self.set_room_state(room_id, statehashid)?; + + let mut servers: HashSet> = + self.room_servers(room_id).filter_map(|r| r.ok()).collect(); + + // In case we are kicking or banning a user, we need to inform their server of the change + if pdu.kind == RoomEventType::RoomMember { + if let Some(state_key_uid) = &pdu + .state_key + .as_ref() + .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) + { + servers.insert(Box::from(state_key_uid.server_name())); + } + } + + // Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above + servers.remove(db.globals.server_name()); + + db.sending.send_pdu(servers.into_iter(), &pdu_id)?; + Ok(pdu.event_id) } + /// Append the incoming event setting the state snapshot to the state from the + /// server that sent the event. + #[tracing::instrument(skip_all)] + fn append_incoming_pdu<'a>( + db: &Database, + pdu: &PduEvent, + pdu_json: CanonicalJsonObject, + new_room_leaves: impl IntoIterator + Clone + Debug, + state_ids_compressed: HashSet, + soft_fail: bool, + _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + ) -> Result>> { + // We append to state before appending the pdu, so we don't have a moment in time with the + // pdu without it's state. This is okay because append_pdu can't fail. + db.rooms.set_event_state( + &pdu.event_id, + &pdu.room_id, + state_ids_compressed, + &db.globals, + )?; + + if soft_fail { + db.rooms + .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; + db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; + return Ok(None); + } + + let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; + + Ok(Some(pdu_id)) + } + /// Returns an iterator over all PDUs in a room. #[tracing::instrument(skip(self))] pub fn all_pdus<'a>( diff --git a/src/service/rooms/user.rs b/src/service/rooms/user/mod.rs similarity index 100% rename from src/service/rooms/user.rs rename to src/service/rooms/user/mod.rs