mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-07-31 12:18:31 +00:00
improvement: make more things async
This commit is contained in:
parent
9b898248c7
commit
0bc03e90a1
7 changed files with 244 additions and 155 deletions
|
@ -137,7 +137,7 @@ pub async fn get_context_route(
|
|||
.expect("All rooms have state"),
|
||||
};
|
||||
|
||||
let state_ids = db.rooms.state_full_ids(shortstatehash)?;
|
||||
let state_ids = db.rooms.state_full_ids(shortstatehash).await?;
|
||||
|
||||
let end_token = events_after
|
||||
.last()
|
||||
|
|
|
@ -29,7 +29,7 @@ use ruma::{
|
|||
};
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap},
|
||||
iter,
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, Instant},
|
||||
|
@ -48,19 +48,20 @@ pub async fn join_room_by_id_route(
|
|||
) -> Result<join_room_by_id::v3::Response> {
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
|
||||
let mut servers: HashSet<_> = db
|
||||
.rooms
|
||||
.invite_state(sender_user, &body.room_id)?
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||
.filter_map(|sender| UserId::parse(sender).ok())
|
||||
.map(|user| user.server_name().to_owned())
|
||||
.collect();
|
||||
let mut servers = Vec::new(); // There is no body.server_name for /roomId/join
|
||||
servers.extend(
|
||||
db.rooms
|
||||
.invite_state(sender_user, &body.room_id)?
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||
.filter_map(|sender| UserId::parse(sender).ok())
|
||||
.map(|user| user.server_name().to_owned()),
|
||||
);
|
||||
|
||||
servers.insert(body.room_id.server_name().to_owned());
|
||||
servers.push(body.room_id.server_name().to_owned());
|
||||
|
||||
let ret = join_room_by_id_helper(
|
||||
&db,
|
||||
|
@ -91,19 +92,20 @@ pub async fn join_room_by_id_or_alias_route(
|
|||
|
||||
let (servers, room_id) = match Box::<RoomId>::try_from(body.room_id_or_alias) {
|
||||
Ok(room_id) => {
|
||||
let mut servers: HashSet<_> = db
|
||||
.rooms
|
||||
.invite_state(sender_user, &room_id)?
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||
.filter_map(|sender| UserId::parse(sender).ok())
|
||||
.map(|user| user.server_name().to_owned())
|
||||
.collect();
|
||||
let mut servers = body.server_name.clone();
|
||||
servers.extend(
|
||||
db.rooms
|
||||
.invite_state(sender_user, &room_id)?
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||
.filter_map(|sender| UserId::parse(sender).ok())
|
||||
.map(|user| user.server_name().to_owned()),
|
||||
);
|
||||
|
||||
servers.insert(room_id.server_name().to_owned());
|
||||
servers.push(room_id.server_name().to_owned());
|
||||
(servers, room_id)
|
||||
}
|
||||
Err(room_alias) => {
|
||||
|
@ -413,7 +415,8 @@ pub async fn get_member_events_route(
|
|||
Ok(get_member_events::v3::Response {
|
||||
chunk: db
|
||||
.rooms
|
||||
.room_state_full(&body.room_id)?
|
||||
.room_state_full(&body.room_id)
|
||||
.await?
|
||||
.iter()
|
||||
.filter(|(key, _)| key.0 == StateEventType::RoomMember)
|
||||
.map(|(_, pdu)| pdu.to_member_event().into())
|
||||
|
@ -462,7 +465,7 @@ async fn join_room_by_id_helper(
|
|||
db: &Database,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
servers: &HashSet<Box<ServerName>>,
|
||||
servers: &[Box<ServerName>],
|
||||
_third_party_signed: Option<&IncomingThirdPartySigned>,
|
||||
) -> Result<join_room_by_id::v3::Response> {
|
||||
let sender_user = sender_user.expect("user is authenticated");
|
||||
|
@ -478,7 +481,7 @@ async fn join_room_by_id_helper(
|
|||
let state_lock = mutex_state.lock().await;
|
||||
|
||||
// Ask a remote server if we don't have this room
|
||||
if !db.rooms.exists(room_id)? && room_id.server_name() != db.globals.server_name() {
|
||||
if !db.rooms.exists(room_id)? {
|
||||
let mut make_join_response_and_server = Err(Error::BadServerResponse(
|
||||
"No server available to assist in joining.",
|
||||
));
|
||||
|
@ -1032,6 +1035,13 @@ pub(crate) async fn invite_helper<'a>(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
if !db.rooms.is_joined(sender_user, &room_id)? {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::Forbidden,
|
||||
"You don't have permission to view this room.",
|
||||
));
|
||||
}
|
||||
|
||||
let mutex_state = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_state
|
||||
|
|
|
@ -124,7 +124,8 @@ pub async fn get_state_events_route(
|
|||
Ok(get_state_events::v3::Response {
|
||||
room_state: db
|
||||
.rooms
|
||||
.room_state_full(&body.room_id)?
|
||||
.room_state_full(&body.room_id)
|
||||
.await?
|
||||
.values()
|
||||
.map(|pdu| pdu.to_state_event())
|
||||
.collect(),
|
||||
|
|
|
@ -230,18 +230,20 @@ async fn sync_helper(
|
|||
for room_id in all_joined_rooms {
|
||||
let room_id = room_id?;
|
||||
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
// This will make sure the we have all events until next_batch
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
// This will make sure the we have all events until next_batch
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
}
|
||||
|
||||
let timeline_pdus;
|
||||
let limited;
|
||||
|
@ -296,10 +298,12 @@ async fn sync_helper(
|
|||
|
||||
// Database queries:
|
||||
|
||||
let current_shortstatehash = db
|
||||
.rooms
|
||||
.current_shortstatehash(&room_id)?
|
||||
.expect("All rooms have state");
|
||||
let current_shortstatehash = if let Some(s) = db.rooms.current_shortstatehash(&room_id)? {
|
||||
s
|
||||
} else {
|
||||
error!("Room {} has no state", room_id);
|
||||
continue;
|
||||
};
|
||||
|
||||
let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?;
|
||||
|
||||
|
@ -377,11 +381,12 @@ async fn sync_helper(
|
|||
|
||||
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
|
||||
|
||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash).await?;
|
||||
|
||||
let mut state_events = Vec::new();
|
||||
let mut lazy_loaded = HashSet::new();
|
||||
|
||||
let mut i = 0;
|
||||
for (shortstatekey, id) in current_state_ids {
|
||||
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?;
|
||||
|
||||
|
@ -394,6 +399,11 @@ async fn sync_helper(
|
|||
}
|
||||
};
|
||||
state_events.push(pdu);
|
||||
|
||||
i += 1;
|
||||
if i % 100 == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
} else if !lazy_load_enabled
|
||||
|| body.full_state
|
||||
|| timeline_users.contains(&state_key)
|
||||
|
@ -411,6 +421,11 @@ async fn sync_helper(
|
|||
lazy_loaded.insert(uid);
|
||||
}
|
||||
state_events.push(pdu);
|
||||
|
||||
i += 1;
|
||||
if i % 100 == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -462,8 +477,8 @@ async fn sync_helper(
|
|||
let mut lazy_loaded = HashSet::new();
|
||||
|
||||
if since_shortstatehash != current_shortstatehash {
|
||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
||||
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
|
||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash).await?;
|
||||
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash).await?;
|
||||
|
||||
for (key, id) in current_state_ids {
|
||||
if body.full_state || since_state_ids.get(&key) != Some(&id) {
|
||||
|
@ -490,6 +505,7 @@ async fn sync_helper(
|
|||
}
|
||||
|
||||
state_events.push(pdu);
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -753,17 +769,19 @@ async fn sync_helper(
|
|||
for result in all_left_rooms {
|
||||
let (room_id, left_state_events) = result?;
|
||||
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
}
|
||||
|
||||
let left_count = db.rooms.get_left_count(&room_id, &sender_user)?;
|
||||
|
||||
|
@ -793,17 +811,19 @@ async fn sync_helper(
|
|||
for result in all_invited_rooms {
|
||||
let (room_id, invite_state_events) = result?;
|
||||
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let mutex_insert = Arc::clone(
|
||||
db.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
drop(insert_lock);
|
||||
}
|
||||
|
||||
let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue