mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-06-27 16:35:59 +00:00
refactor: event handling code
This commit is contained in:
parent
9c71a2cd5e
commit
3fd7f6efc2
19 changed files with 1604 additions and 1867 deletions
|
@ -806,36 +806,6 @@ pub(crate) async fn invite_helper<'a>(
|
||||||
);
|
);
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
let prev_events: Vec<_> = db
|
|
||||||
.rooms
|
|
||||||
.get_pdu_leaves(room_id)?
|
|
||||||
.into_iter()
|
|
||||||
.take(20)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let create_event = db
|
|
||||||
.rooms
|
|
||||||
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
|
||||||
|
|
||||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
|
||||||
.as_ref()
|
|
||||||
.map(|create_event| {
|
|
||||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
|
||||||
Error::bad_database("Invalid create event in db.")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// If there was no create event yet, assume we are creating a room with the default
|
|
||||||
// version right now
|
|
||||||
let room_version_id = create_event_content
|
|
||||||
.map_or(db.globals.default_room_version(), |create_event| {
|
|
||||||
create_event.room_version
|
|
||||||
});
|
|
||||||
let room_version =
|
|
||||||
RoomVersion::new(&room_version_id).expect("room version is supported");
|
|
||||||
|
|
||||||
let content = to_raw_value(&RoomMemberEventContent {
|
let content = to_raw_value(&RoomMemberEventContent {
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
displayname: None,
|
displayname: None,
|
||||||
|
@ -851,98 +821,7 @@ pub(crate) async fn invite_helper<'a>(
|
||||||
let state_key = user_id.to_string();
|
let state_key = user_id.to_string();
|
||||||
let kind = StateEventType::RoomMember;
|
let kind = StateEventType::RoomMember;
|
||||||
|
|
||||||
let auth_events = db.rooms.get_auth_events(
|
let (pdu, pdu_json) = create_hash_and_sign_event();
|
||||||
room_id,
|
|
||||||
&kind.to_string().into(),
|
|
||||||
sender_user,
|
|
||||||
Some(&state_key),
|
|
||||||
&content,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Our depth is the maximum depth of prev_events + 1
|
|
||||||
let depth = prev_events
|
|
||||||
.iter()
|
|
||||||
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
|
|
||||||
.max()
|
|
||||||
.unwrap_or_else(|| uint!(0))
|
|
||||||
+ uint!(1);
|
|
||||||
|
|
||||||
let mut unsigned = BTreeMap::new();
|
|
||||||
|
|
||||||
if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? {
|
|
||||||
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_sender".to_owned(),
|
|
||||||
to_raw_value(&prev_pdu.sender).expect("UserId is valid"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let pdu = PduEvent {
|
|
||||||
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
|
||||||
room_id: room_id.to_owned(),
|
|
||||||
sender: sender_user.to_owned(),
|
|
||||||
origin_server_ts: utils::millis_since_unix_epoch()
|
|
||||||
.try_into()
|
|
||||||
.expect("time is valid"),
|
|
||||||
kind: kind.to_string().into(),
|
|
||||||
content,
|
|
||||||
state_key: Some(state_key),
|
|
||||||
prev_events,
|
|
||||||
depth,
|
|
||||||
auth_events: auth_events
|
|
||||||
.iter()
|
|
||||||
.map(|(_, pdu)| pdu.event_id.clone())
|
|
||||||
.collect(),
|
|
||||||
redacts: None,
|
|
||||||
unsigned: if unsigned.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
|
||||||
},
|
|
||||||
hashes: EventHash {
|
|
||||||
sha256: "aaa".to_owned(),
|
|
||||||
},
|
|
||||||
signatures: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let auth_check = state_res::auth_check(
|
|
||||||
&room_version,
|
|
||||||
&pdu,
|
|
||||||
None::<PduEvent>, // TODO: third_party_invite
|
|
||||||
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("{:?}", e);
|
|
||||||
Error::bad_database("Auth check failed.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if !auth_check {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::Forbidden,
|
|
||||||
"Event is not authorized.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hash and sign
|
|
||||||
let mut pdu_json =
|
|
||||||
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
|
|
||||||
// Add origin because synapse likes that (and it's required in the spec)
|
|
||||||
pdu_json.insert(
|
|
||||||
"origin".to_owned(),
|
|
||||||
to_canonical_value(db.globals.server_name())
|
|
||||||
.expect("server name is a valid CanonicalJsonValue"),
|
|
||||||
);
|
|
||||||
|
|
||||||
ruma::signatures::hash_and_sign_event(
|
|
||||||
db.globals.server_name().as_str(),
|
|
||||||
db.globals.keypair(),
|
|
||||||
&mut pdu_json,
|
|
||||||
&room_version_id,
|
|
||||||
)
|
|
||||||
.expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
|
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load diff
1152
src/service/rooms/event_handler/mod.rs
Normal file
1152
src/service/rooms/event_handler/mod.rs
Normal file
File diff suppressed because it is too large
Load diff
|
@ -1,3 +1,12 @@
|
||||||
|
/// Returns the pdu from the outlier tree.
|
||||||
|
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
||||||
|
self.eventid_outlierpdu
|
||||||
|
.get(event_id.as_bytes())?
|
||||||
|
.map_or(Ok(None), |pdu| {
|
||||||
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the pdu from the outlier tree.
|
/// Returns the pdu from the outlier tree.
|
||||||
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
||||||
self.eventid_outlierpdu
|
self.eventid_outlierpdu
|
||||||
|
@ -8,8 +17,6 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append the PDU as an outlier.
|
/// Append the PDU as an outlier.
|
||||||
///
|
|
||||||
/// Any event given to this will be processed (state-res) on another thread.
|
|
||||||
#[tracing::instrument(skip(self, pdu))]
|
#[tracing::instrument(skip(self, pdu))]
|
||||||
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
|
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
|
||||||
self.eventid_outlierpdu.insert(
|
self.eventid_outlierpdu.insert(
|
16
src/service/rooms/state/data.rs
Normal file
16
src/service/rooms/state/data.rs
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
pub trait Data {
|
||||||
|
fn get_room_shortstatehash(room_id: &RoomId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the last state hash key added to the db for the given room.
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
|
||||||
|
self.roomid_shortstatehash
|
||||||
|
.get(room_id.as_bytes())?
|
||||||
|
.map_or(Ok(None), |bytes| {
|
||||||
|
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||||
|
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash")
|
||||||
|
})?))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -1,133 +1,8 @@
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
impl Service {
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
let mut result = BTreeMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state.into_iter() {
|
|
||||||
let parsed = self.parse_compressed_state_event(compressed)?;
|
|
||||||
result.insert(parsed.0, parsed.1);
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
|
|
||||||
let mut result = HashMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state {
|
|
||||||
let (_, eventid) = self.parse_compressed_state_event(compressed)?;
|
|
||||||
if let Some(pdu) = self.get_pdu(&eventid)? {
|
|
||||||
result.insert(
|
|
||||||
(
|
|
||||||
pdu.kind.to_string().into(),
|
|
||||||
pdu.state_key
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| Error::bad_database("State event has no state key."))?
|
|
||||||
.clone(),
|
|
||||||
),
|
|
||||||
pdu,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get_id(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
let shortstatekey = match self.get_shortstatekey(event_type, state_key)? {
|
|
||||||
Some(s) => s,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
Ok(full_state
|
|
||||||
.into_iter()
|
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
|
||||||
.and_then(|compressed| {
|
|
||||||
self.parse_compressed_state_event(compressed)
|
|
||||||
.ok()
|
|
||||||
.map(|(_, id)| id)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
self.state_get_id(shortstatehash, event_type, state_key)?
|
|
||||||
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the state hash for this pdu.
|
|
||||||
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
|
||||||
self.eventid_shorteventid
|
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |shorteventid| {
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.get(&shorteventid)?
|
|
||||||
.map(|bytes| {
|
|
||||||
utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database(
|
|
||||||
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the last state hash key added to the db for the given room.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.get(room_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |bytes| {
|
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash")
|
|
||||||
})?))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Force the creation of a new StateHash and insert it into the db.
|
/// Force the creation of a new StateHash and insert it into the db.
|
||||||
///
|
///
|
||||||
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
||||||
|
@ -138,7 +13,7 @@
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
let previous_shortstatehash = self.d.current_shortstatehash(room_id)?;
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
let state_hash = self.calculate_hash(
|
||||||
&new_state_ids_compressed
|
&new_state_ids_compressed
|
||||||
|
@ -237,49 +112,6 @@
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the full room state.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn room_state_full(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_full(current_shortstatehash).await
|
|
||||||
} else {
|
|
||||||
Ok(HashMap::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get_id(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get_id(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the leaf pdus of a room.
|
/// Returns the leaf pdus of a room.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
||||||
|
@ -507,3 +339,4 @@
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
}
|
160
src/service/rooms/state_accessor/mod.rs
Normal file
160
src/service/rooms/state_accessor/mod.rs
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
||||||
|
let full_state = self
|
||||||
|
.load_shortstatehash_info(shortstatehash)?
|
||||||
|
.pop()
|
||||||
|
.expect("there is always one layer")
|
||||||
|
.1;
|
||||||
|
let mut result = BTreeMap::new();
|
||||||
|
let mut i = 0;
|
||||||
|
for compressed in full_state.into_iter() {
|
||||||
|
let parsed = self.parse_compressed_state_event(compressed)?;
|
||||||
|
result.insert(parsed.0, parsed.1);
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
if i % 100 == 0 {
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub async fn state_full(
|
||||||
|
&self,
|
||||||
|
shortstatehash: u64,
|
||||||
|
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
||||||
|
let full_state = self
|
||||||
|
.load_shortstatehash_info(shortstatehash)?
|
||||||
|
.pop()
|
||||||
|
.expect("there is always one layer")
|
||||||
|
.1;
|
||||||
|
|
||||||
|
let mut result = HashMap::new();
|
||||||
|
let mut i = 0;
|
||||||
|
for compressed in full_state {
|
||||||
|
let (_, eventid) = self.parse_compressed_state_event(compressed)?;
|
||||||
|
if let Some(pdu) = self.get_pdu(&eventid)? {
|
||||||
|
result.insert(
|
||||||
|
(
|
||||||
|
pdu.kind.to_string().into(),
|
||||||
|
pdu.state_key
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| Error::bad_database("State event has no state key."))?
|
||||||
|
.clone(),
|
||||||
|
),
|
||||||
|
pdu,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
if i % 100 == 0 {
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn state_get_id(
|
||||||
|
&self,
|
||||||
|
shortstatehash: u64,
|
||||||
|
event_type: &StateEventType,
|
||||||
|
state_key: &str,
|
||||||
|
) -> Result<Option<Arc<EventId>>> {
|
||||||
|
let shortstatekey = match self.get_shortstatekey(event_type, state_key)? {
|
||||||
|
Some(s) => s,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
let full_state = self
|
||||||
|
.load_shortstatehash_info(shortstatehash)?
|
||||||
|
.pop()
|
||||||
|
.expect("there is always one layer")
|
||||||
|
.1;
|
||||||
|
Ok(full_state
|
||||||
|
.into_iter()
|
||||||
|
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
||||||
|
.and_then(|compressed| {
|
||||||
|
self.parse_compressed_state_event(compressed)
|
||||||
|
.ok()
|
||||||
|
.map(|(_, id)| id)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn state_get(
|
||||||
|
&self,
|
||||||
|
shortstatehash: u64,
|
||||||
|
event_type: &StateEventType,
|
||||||
|
state_key: &str,
|
||||||
|
) -> Result<Option<Arc<PduEvent>>> {
|
||||||
|
self.state_get_id(shortstatehash, event_type, state_key)?
|
||||||
|
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the state hash for this pdu.
|
||||||
|
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
||||||
|
self.eventid_shorteventid
|
||||||
|
.get(event_id.as_bytes())?
|
||||||
|
.map_or(Ok(None), |shorteventid| {
|
||||||
|
self.shorteventid_shortstatehash
|
||||||
|
.get(&shorteventid)?
|
||||||
|
.map(|bytes| {
|
||||||
|
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||||
|
Error::bad_database(
|
||||||
|
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the full room state.
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub async fn room_state_full(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
||||||
|
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
||||||
|
self.state_full(current_shortstatehash).await
|
||||||
|
} else {
|
||||||
|
Ok(HashMap::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn room_state_get_id(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event_type: &StateEventType,
|
||||||
|
state_key: &str,
|
||||||
|
) -> Result<Option<Arc<EventId>>> {
|
||||||
|
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
||||||
|
self.state_get_id(current_shortstatehash, event_type, state_key)
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn room_state_get(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event_type: &StateEventType,
|
||||||
|
state_key: &str,
|
||||||
|
) -> Result<Option<Arc<PduEvent>>> {
|
||||||
|
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
||||||
|
self.state_get(current_shortstatehash, event_type, state_key)
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -100,16 +100,6 @@
|
||||||
.transpose()
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
|
||||||
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
|
||||||
self.eventid_outlierpdu
|
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map(|pdu| {
|
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
|
||||||
})
|
|
||||||
.transpose()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
/// Returns the json of a pdu.
|
||||||
pub fn get_non_outlier_pdu_json(
|
pub fn get_non_outlier_pdu_json(
|
||||||
&self,
|
&self,
|
||||||
|
@ -487,211 +477,6 @@
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(pdu_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new persisted data unit and adds it to a room.
|
|
||||||
#[tracing::instrument(skip(self, db, _mutex_lock))]
|
|
||||||
pub fn build_and_append_pdu(
|
|
||||||
&self,
|
|
||||||
pdu_builder: PduBuilder,
|
|
||||||
sender: &UserId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
db: &Database,
|
|
||||||
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
||||||
) -> Result<Arc<EventId>> {
|
|
||||||
let PduBuilder {
|
|
||||||
event_type,
|
|
||||||
content,
|
|
||||||
unsigned,
|
|
||||||
state_key,
|
|
||||||
redacts,
|
|
||||||
} = pdu_builder;
|
|
||||||
|
|
||||||
let prev_events = self
|
|
||||||
.get_pdu_leaves(room_id)?
|
|
||||||
.into_iter()
|
|
||||||
.take(20)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let create_event = self.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
|
||||||
|
|
||||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
|
||||||
.as_ref()
|
|
||||||
.map(|create_event| {
|
|
||||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
|
||||||
Error::bad_database("Invalid create event in db.")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// If there was no create event yet, assume we are creating a room with the default
|
|
||||||
// version right now
|
|
||||||
let room_version_id = create_event_content
|
|
||||||
.map_or(db.globals.default_room_version(), |create_event| {
|
|
||||||
create_event.room_version
|
|
||||||
});
|
|
||||||
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
|
|
||||||
|
|
||||||
let auth_events =
|
|
||||||
self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
|
|
||||||
|
|
||||||
// Our depth is the maximum depth of prev_events + 1
|
|
||||||
let depth = prev_events
|
|
||||||
.iter()
|
|
||||||
.filter_map(|event_id| Some(self.get_pdu(event_id).ok()??.depth))
|
|
||||||
.max()
|
|
||||||
.unwrap_or_else(|| uint!(0))
|
|
||||||
+ uint!(1);
|
|
||||||
|
|
||||||
let mut unsigned = unsigned.unwrap_or_default();
|
|
||||||
if let Some(state_key) = &state_key {
|
|
||||||
if let Some(prev_pdu) =
|
|
||||||
self.room_state_get(room_id, &event_type.to_string().into(), state_key)?
|
|
||||||
{
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_content".to_owned(),
|
|
||||||
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
|
|
||||||
);
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_sender".to_owned(),
|
|
||||||
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut pdu = PduEvent {
|
|
||||||
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
|
||||||
room_id: room_id.to_owned(),
|
|
||||||
sender: sender.to_owned(),
|
|
||||||
origin_server_ts: utils::millis_since_unix_epoch()
|
|
||||||
.try_into()
|
|
||||||
.expect("time is valid"),
|
|
||||||
kind: event_type,
|
|
||||||
content,
|
|
||||||
state_key,
|
|
||||||
prev_events,
|
|
||||||
depth,
|
|
||||||
auth_events: auth_events
|
|
||||||
.iter()
|
|
||||||
.map(|(_, pdu)| pdu.event_id.clone())
|
|
||||||
.collect(),
|
|
||||||
redacts,
|
|
||||||
unsigned: if unsigned.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
|
||||||
},
|
|
||||||
hashes: EventHash {
|
|
||||||
sha256: "aaa".to_owned(),
|
|
||||||
},
|
|
||||||
signatures: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let auth_check = state_res::auth_check(
|
|
||||||
&room_version,
|
|
||||||
&pdu,
|
|
||||||
None::<PduEvent>, // TODO: third_party_invite
|
|
||||||
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("{:?}", e);
|
|
||||||
Error::bad_database("Auth check failed.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if !auth_check {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::Forbidden,
|
|
||||||
"Event is not authorized.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hash and sign
|
|
||||||
let mut pdu_json =
|
|
||||||
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
|
|
||||||
// Add origin because synapse likes that (and it's required in the spec)
|
|
||||||
pdu_json.insert(
|
|
||||||
"origin".to_owned(),
|
|
||||||
CanonicalJsonValue::String(db.globals.server_name().as_ref().to_owned()),
|
|
||||||
);
|
|
||||||
|
|
||||||
match ruma::signatures::hash_and_sign_event(
|
|
||||||
db.globals.server_name().as_str(),
|
|
||||||
db.globals.keypair(),
|
|
||||||
&mut pdu_json,
|
|
||||||
&room_version_id,
|
|
||||||
) {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(e) => {
|
|
||||||
return match e {
|
|
||||||
ruma::signatures::Error::PduSize => Err(Error::BadRequest(
|
|
||||||
ErrorKind::TooLarge,
|
|
||||||
"Message is too long",
|
|
||||||
)),
|
|
||||||
_ => Err(Error::BadRequest(
|
|
||||||
ErrorKind::Unknown,
|
|
||||||
"Signing event failed",
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate event id
|
|
||||||
pdu.event_id = EventId::parse_arc(format!(
|
|
||||||
"${}",
|
|
||||||
ruma::signatures::reference_hash(&pdu_json, &room_version_id)
|
|
||||||
.expect("ruma can calculate reference hashes")
|
|
||||||
))
|
|
||||||
.expect("ruma's reference hashes are valid event ids");
|
|
||||||
|
|
||||||
pdu_json.insert(
|
|
||||||
"event_id".to_owned(),
|
|
||||||
CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Generate short event id
|
|
||||||
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?;
|
|
||||||
|
|
||||||
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
||||||
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
||||||
let statehashid = self.append_to_state(&pdu, &db.globals)?;
|
|
||||||
|
|
||||||
let pdu_id = self.append_pdu(
|
|
||||||
&pdu,
|
|
||||||
pdu_json,
|
|
||||||
// Since this PDU references all pdu_leaves we can update the leaves
|
|
||||||
// of the room
|
|
||||||
iter::once(&*pdu.event_id),
|
|
||||||
db,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// We set the room state after inserting the pdu, so that we never have a moment in time
|
|
||||||
// where events in the current room state do not exist
|
|
||||||
self.set_room_state(room_id, statehashid)?;
|
|
||||||
|
|
||||||
let mut servers: HashSet<Box<ServerName>> =
|
|
||||||
self.room_servers(room_id).filter_map(|r| r.ok()).collect();
|
|
||||||
|
|
||||||
// In case we are kicking or banning a user, we need to inform their server of the change
|
|
||||||
if pdu.kind == RoomEventType::RoomMember {
|
|
||||||
if let Some(state_key_uid) = &pdu
|
|
||||||
.state_key
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
|
|
||||||
{
|
|
||||||
servers.insert(Box::from(state_key_uid.server_name()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above
|
|
||||||
servers.remove(db.globals.server_name());
|
|
||||||
|
|
||||||
db.sending.send_pdu(servers.into_iter(), &pdu_id)?;
|
|
||||||
|
|
||||||
for appservice in db.appservice.all()? {
|
for appservice in db.appservice.all()? {
|
||||||
if self.appservice_in_room(room_id, &appservice, db)? {
|
if self.appservice_in_room(room_id, &appservice, db)? {
|
||||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||||
|
@ -768,9 +553,268 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Ok(pdu_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_hash_and_sign_event(
|
||||||
|
&self,
|
||||||
|
pdu_builder: PduBuilder,
|
||||||
|
sender: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
db: &Database,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> (PduEvent, CanonicalJsonObj) {
|
||||||
|
let PduBuilder {
|
||||||
|
event_type,
|
||||||
|
content,
|
||||||
|
unsigned,
|
||||||
|
state_key,
|
||||||
|
redacts,
|
||||||
|
} = pdu_builder;
|
||||||
|
|
||||||
|
let prev_events: Vec<_> = db
|
||||||
|
.rooms
|
||||||
|
.get_pdu_leaves(room_id)?
|
||||||
|
.into_iter()
|
||||||
|
.take(20)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let create_event = db
|
||||||
|
.rooms
|
||||||
|
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
||||||
|
|
||||||
|
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||||
|
.as_ref()
|
||||||
|
.map(|create_event| {
|
||||||
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
|
warn!("Invalid create event: {}", e);
|
||||||
|
Error::bad_database("Invalid create event in db.")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
// If there was no create event yet, assume we are creating a room with the default
|
||||||
|
// version right now
|
||||||
|
let room_version_id = create_event_content
|
||||||
|
.map_or(db.globals.default_room_version(), |create_event| {
|
||||||
|
create_event.room_version
|
||||||
|
});
|
||||||
|
let room_version =
|
||||||
|
RoomVersion::new(&room_version_id).expect("room version is supported");
|
||||||
|
|
||||||
|
let auth_events =
|
||||||
|
self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
|
||||||
|
|
||||||
|
// Our depth is the maximum depth of prev_events + 1
|
||||||
|
let depth = prev_events
|
||||||
|
.iter()
|
||||||
|
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
|
||||||
|
.max()
|
||||||
|
.unwrap_or_else(|| uint!(0))
|
||||||
|
+ uint!(1);
|
||||||
|
|
||||||
|
let mut unsigned = unsigned.unwrap_or_default();
|
||||||
|
|
||||||
|
if let Some(state_key) = &state_key {
|
||||||
|
if let Some(prev_pdu) =
|
||||||
|
self.room_state_get(room_id, &event_type.to_string().into(), state_key)?
|
||||||
|
{
|
||||||
|
unsigned.insert(
|
||||||
|
"prev_content".to_owned(),
|
||||||
|
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
|
||||||
|
);
|
||||||
|
unsigned.insert(
|
||||||
|
"prev_sender".to_owned(),
|
||||||
|
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let pdu = PduEvent {
|
||||||
|
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
||||||
|
room_id: room_id.to_owned(),
|
||||||
|
sender: sender_user.to_owned(),
|
||||||
|
origin_server_ts: utils::millis_since_unix_epoch()
|
||||||
|
.try_into()
|
||||||
|
.expect("time is valid"),
|
||||||
|
kind: event_type,
|
||||||
|
content,
|
||||||
|
state_key,
|
||||||
|
prev_events,
|
||||||
|
depth,
|
||||||
|
auth_events: auth_events
|
||||||
|
.iter()
|
||||||
|
.map(|(_, pdu)| pdu.event_id.clone())
|
||||||
|
.collect(),
|
||||||
|
redacts,
|
||||||
|
unsigned: if unsigned.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
||||||
|
},
|
||||||
|
hashes: EventHash {
|
||||||
|
sha256: "aaa".to_owned(),
|
||||||
|
},
|
||||||
|
signatures: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth_check = state_res::auth_check(
|
||||||
|
&room_version,
|
||||||
|
&pdu,
|
||||||
|
None::<PduEvent>, // TODO: third_party_invite
|
||||||
|
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("{:?}", e);
|
||||||
|
Error::bad_database("Auth check failed.")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if !auth_check {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::Forbidden,
|
||||||
|
"Event is not authorized.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash and sign
|
||||||
|
let mut pdu_json =
|
||||||
|
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
||||||
|
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
|
||||||
|
// Add origin because synapse likes that (and it's required in the spec)
|
||||||
|
pdu_json.insert(
|
||||||
|
"origin".to_owned(),
|
||||||
|
to_canonical_value(db.globals.server_name())
|
||||||
|
.expect("server name is a valid CanonicalJsonValue"),
|
||||||
|
);
|
||||||
|
|
||||||
|
match ruma::signatures::hash_and_sign_event(
|
||||||
|
db.globals.server_name().as_str(),
|
||||||
|
db.globals.keypair(),
|
||||||
|
&mut pdu_json,
|
||||||
|
&room_version_id,
|
||||||
|
) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
return match e {
|
||||||
|
ruma::signatures::Error::PduSize => Err(Error::BadRequest(
|
||||||
|
ErrorKind::TooLarge,
|
||||||
|
"Message is too long",
|
||||||
|
)),
|
||||||
|
_ => Err(Error::BadRequest(
|
||||||
|
ErrorKind::Unknown,
|
||||||
|
"Signing event failed",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate event id
|
||||||
|
pdu.event_id = EventId::parse_arc(format!(
|
||||||
|
"${}",
|
||||||
|
ruma::signatures::reference_hash(&pdu_json, &room_version_id)
|
||||||
|
.expect("ruma can calculate reference hashes")
|
||||||
|
))
|
||||||
|
.expect("ruma's reference hashes are valid event ids");
|
||||||
|
|
||||||
|
pdu_json.insert(
|
||||||
|
"event_id".to_owned(),
|
||||||
|
CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Generate short event id
|
||||||
|
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new persisted data unit and adds it to a room. This function takes a
|
||||||
|
/// roomid_mutex_state, meaning that only this function is able to mutate the room state.
|
||||||
|
#[tracing::instrument(skip(self, db, _mutex_lock))]
|
||||||
|
pub fn build_and_append_pdu(
|
||||||
|
&self,
|
||||||
|
pdu_builder: PduBuilder,
|
||||||
|
sender: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
db: &Database,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> Result<Arc<EventId>> {
|
||||||
|
|
||||||
|
let (pdu, pdu_json) = create_hash_and_sign_event()?;
|
||||||
|
|
||||||
|
|
||||||
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||||
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||||
|
let statehashid = self.append_to_state(&pdu, &db.globals)?;
|
||||||
|
|
||||||
|
let pdu_id = self.append_pdu(
|
||||||
|
&pdu,
|
||||||
|
pdu_json,
|
||||||
|
// Since this PDU references all pdu_leaves we can update the leaves
|
||||||
|
// of the room
|
||||||
|
iter::once(&*pdu.event_id),
|
||||||
|
db,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// We set the room state after inserting the pdu, so that we never have a moment in time
|
||||||
|
// where events in the current room state do not exist
|
||||||
|
self.set_room_state(room_id, statehashid)?;
|
||||||
|
|
||||||
|
let mut servers: HashSet<Box<ServerName>> =
|
||||||
|
self.room_servers(room_id).filter_map(|r| r.ok()).collect();
|
||||||
|
|
||||||
|
// In case we are kicking or banning a user, we need to inform their server of the change
|
||||||
|
if pdu.kind == RoomEventType::RoomMember {
|
||||||
|
if let Some(state_key_uid) = &pdu
|
||||||
|
.state_key
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
|
||||||
|
{
|
||||||
|
servers.insert(Box::from(state_key_uid.server_name()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above
|
||||||
|
servers.remove(db.globals.server_name());
|
||||||
|
|
||||||
|
db.sending.send_pdu(servers.into_iter(), &pdu_id)?;
|
||||||
|
|
||||||
Ok(pdu.event_id)
|
Ok(pdu.event_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Append the incoming event setting the state snapshot to the state from the
|
||||||
|
/// server that sent the event.
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
fn append_incoming_pdu<'a>(
|
||||||
|
db: &Database,
|
||||||
|
pdu: &PduEvent,
|
||||||
|
pdu_json: CanonicalJsonObject,
|
||||||
|
new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug,
|
||||||
|
state_ids_compressed: HashSet<CompressedStateEvent>,
|
||||||
|
soft_fail: bool,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> Result<Option<Vec<u8>>> {
|
||||||
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||||
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||||
|
db.rooms.set_event_state(
|
||||||
|
&pdu.event_id,
|
||||||
|
&pdu.room_id,
|
||||||
|
state_ids_compressed,
|
||||||
|
&db.globals,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if soft_fail {
|
||||||
|
db.rooms
|
||||||
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
|
||||||
|
db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?;
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?;
|
||||||
|
|
||||||
|
Ok(Some(pdu_id))
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns an iterator over all PDUs in a room.
|
/// Returns an iterator over all PDUs in a room.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn all_pdus<'a>(
|
pub fn all_pdus<'a>(
|
Loading…
Add table
Add a link
Reference in a new issue