1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

Merge branch 'sync-include-leave' into 'next'

fix(api/sync): respect `include_leave` filter parameter

See merge request famedly/conduit!688
This commit is contained in:
avdb 2025-06-24 04:49:29 +00:00
commit 4bbd8c7764

View file

@ -300,65 +300,164 @@ async fn sync_helper(
} }
let mut left_rooms = BTreeMap::new(); let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services() if filter.room.include_leave {
.rooms let all_left_rooms: Vec<_> = services()
.state_cache
.rooms_left(&sender_user)
.collect();
for result in all_left_rooms {
let (room_id, _) = result?;
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
drop(insert_lock);
}
let left_count = services()
.rooms .rooms
.state_cache .state_cache
.get_left_count(&room_id, &sender_user)?; .rooms_left(&sender_user)
.collect();
// Left before last sync for result in all_left_rooms {
if Some(since) >= left_count { let (room_id, _) = result?;
continue;
}
if !services().rooms.metadata.exists(&room_id)? { {
// This is just a rejected invite, not a room we know // Get and drop the lock to wait for remaining operations to finish
let event = PduEvent { let mutex_insert = Arc::clone(
event_id: EventId::new(services().globals.server_name()).into(), services()
sender: sender_user.clone(), .globals
origin_server_ts: utils::millis_since_unix_epoch() .roomid_mutex_insert
.try_into() .write()
.expect("Timestamp is valid js_int value"), .await
kind: TimelineEventType::RoomMember, .entry(room_id.clone())
content: serde_json::from_str(r#"{ "membership": "leave"}"#).unwrap(), .or_default(),
state_key: Some(sender_user.to_string()), );
unsigned: None, let insert_lock = mutex_insert.lock().await;
// The following keys are dropped on conversion drop(insert_lock);
room_id: room_id.clone(), }
prev_events: vec![],
depth: uint!(1), let left_count = services()
auth_events: vec![], .rooms
redacts: None, .state_cache
hashes: EventHash { .get_left_count(&room_id, &sender_user)?;
sha256: String::new(),
}, // Left before last sync
signatures: None, if Some(since) >= left_count {
continue;
}
if !services().rooms.metadata.exists(&room_id)? {
// This is just a rejected invite, not a room we know
let event = PduEvent {
event_id: EventId::new(services().globals.server_name()).into(),
sender: sender_user.clone(),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
kind: TimelineEventType::RoomMember,
content: serde_json::from_str(r#"{ "membership": "leave"}"#).unwrap(),
state_key: Some(sender_user.to_string()),
unsigned: None,
// The following keys are dropped on conversion
room_id: room_id.clone(),
prev_events: vec![],
depth: uint!(1),
auth_events: vec![],
redacts: None,
hashes: EventHash {
sha256: String::new(),
},
signatures: None,
};
left_rooms.insert(
room_id,
LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline {
limited: false,
prev_batch: Some(next_batch_string.clone()),
events: Vec::new(),
},
state: State {
events: vec![event.to_sync_state_event()],
},
},
);
continue;
}
let mut left_state_events = Vec::new();
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(&room_id, since)?;
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
None => HashMap::new(),
}; };
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
&room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)? {
Some(e) => e,
None => {
error!("Left room but no left state event");
continue;
}
};
let left_shortstatehash = match services()
.rooms
.state_accessor
.pdu_shortstatehash(&left_event_id)?
{
Some(s) => s,
None => {
error!("Leave event has no state");
continue;
}
};
let mut left_state_ids = services()
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
let leave_shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
let mut i = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
|| full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
left_state_events.push(pdu.to_sync_state_event());
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
left_rooms.insert( left_rooms.insert(
room_id, room_id.clone(),
LeftRoom { LeftRoom {
account_data: RoomAccountData { events: Vec::new() }, account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline { timeline: Timeline {
@ -367,107 +466,11 @@ async fn sync_helper(
events: Vec::new(), events: Vec::new(),
}, },
state: State { state: State {
events: vec![event.to_sync_state_event()], events: left_state_events,
}, },
}, },
); );
continue;
} }
let mut left_state_events = Vec::new();
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(&room_id, since)?;
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
None => HashMap::new(),
};
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
&room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)? {
Some(e) => e,
None => {
error!("Left room but no left state event");
continue;
}
};
let left_shortstatehash = match services()
.rooms
.state_accessor
.pdu_shortstatehash(&left_event_id)?
{
Some(s) => s,
None => {
error!("Leave event has no state");
continue;
}
};
let mut left_state_ids = services()
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
let leave_shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
let mut i = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
|| full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
left_state_events.push(pdu.to_sync_state_event());
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
left_rooms.insert(
room_id.clone(),
LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline {
limited: false,
prev_batch: Some(next_batch_string.clone()),
events: Vec::new(),
},
state: State {
events: left_state_events,
},
},
);
} }
let mut invited_rooms = BTreeMap::new(); let mut invited_rooms = BTreeMap::new();