Skip to content

Commit ab4c524

Browse files
bnjbvrrichvdh
andauthored
crypto(perf): don't hold the cache lock while waiting on a user key query (#2806)
* crypto(fix): don't hold the cache lock while waiting on a user key query Fixes #2802. The lock was only useful to sync the database and the in-memory cache for the users awaiting a key query request. So it's possible to slightly tweak the API by moving the method from `SyncedKeyQueryManager` to non-synced `KeyQueryManager`, and require a `StoreCacheGuard` (i.e. the owned lock, so we can manually drop it when we feel like so). I've looked at all the other methods, and they do require the cache for writing into it and the store. At the limit we could also move `SyncedKeyQueryManager::users_for_key_query` into `KeyQueryManager`, but the lock in there is hold for a very short-time, so it shouldn't be an issue. * Add test for the key query deadlock while waiting for the response. * Update crates/matrix-sdk-crypto/src/machine.rs Co-authored-by: Richard van der Hoff <[email protected]>
1 parent c2f4222 commit ab4c524

File tree

3 files changed

+102
-43
lines changed

3 files changed

+102
-43
lines changed

crates/matrix-sdk-crypto/src/machine.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ impl OlmMachine {
408408
/// This can be useful if we need the results from [`get_identity`] or
409409
/// [`get_user_devices`] to be as up-to-date as possible.
410410
///
411+
/// Note that this request won't be awaited by other calls waiting for a
412+
/// user's or device's keys, since this is an out-of-band query.
413+
///
411414
/// # Arguments
412415
///
413416
/// * `users` - list of users whose keys should be queried
@@ -1606,10 +1609,8 @@ impl OlmMachine {
16061609
self.inner
16071610
.identity_manager
16081611
.key_query_manager
1609-
.synced(&cache)
1610-
.await?
1611-
.wait_if_user_key_query_pending(timeout, user_id)
1612-
.await;
1612+
.wait_if_user_key_query_pending(cache, timeout, user_id)
1613+
.await?;
16131614
}
16141615
Ok(())
16151616
}
@@ -4029,4 +4030,52 @@ pub(crate) mod tests {
40294030
"Our identity should not be verified when there's a mismatch in the cross-signing keys"
40304031
);
40314032
}
4033+
4034+
#[async_test]
4035+
async fn test_wait_on_key_query_doesnt_block_store() {
4036+
// Waiting for a key query shouldn't delay other write attempts to the store.
4037+
// This test will end immediately if it works, and times out after a few seconds
4038+
// if it failed.
4039+
4040+
let machine = OlmMachine::new(bob_id(), bob_device_id()).await;
4041+
4042+
// Mark Alice as a tracked user, so it gets into the groups of users for which
4043+
// we need to query keys.
4044+
machine.update_tracked_users([alice_id()]).await.unwrap();
4045+
4046+
// Start a background task that will wait for the key query to finish silently
4047+
// in the background.
4048+
let machine_cloned = machine.clone();
4049+
let wait = tokio::spawn(async move {
4050+
let machine = machine_cloned;
4051+
let user_devices =
4052+
machine.get_user_devices(alice_id(), Some(Duration::from_secs(10))).await.unwrap();
4053+
assert!(user_devices.devices().next().is_some());
4054+
});
4055+
4056+
// Let the background task work first.
4057+
tokio::task::yield_now().await;
4058+
4059+
// Create a key upload request and process it back immediately.
4060+
let requests = machine.bootstrap_cross_signing(false).await.unwrap();
4061+
4062+
let req = requests.upload_keys_req.expect("upload keys request should be there");
4063+
let response = keys_upload_response();
4064+
let mark_request_as_sent = machine.mark_request_as_sent(&req.request_id, &response);
4065+
tokio::time::timeout(Duration::from_secs(5), mark_request_as_sent)
4066+
.await
4067+
.expect("no timeout")
4068+
.expect("the underlying request has been marked as sent");
4069+
4070+
// Answer the key query, so the background task completes immediately?
4071+
let response = keys_query_response();
4072+
let key_queries = machine.inner.identity_manager.users_for_key_query().await.unwrap();
4073+
4074+
for (id, _) in key_queries {
4075+
machine.mark_request_as_sent(&id, &response).await.unwrap();
4076+
}
4077+
4078+
// The waiting should successfully complete.
4079+
wait.await.unwrap();
4080+
}
40324081
}

crates/matrix-sdk-crypto/src/session_manager/sessions.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,8 @@ impl SessionManager {
188188
.key_request_machine
189189
.identity_manager()
190190
.key_query_manager
191-
.synced(&cache)
191+
.wait_if_user_key_query_pending(cache, Self::KEYS_QUERY_WAIT_TIME, user_id)
192192
.await?
193-
.wait_if_user_key_query_pending(Self::KEYS_QUERY_WAIT_TIME, user_id)
194-
.await
195193
{
196194
WasPending => self.store.get_readonly_devices_filtered(user_id).await?,
197195
_ => user_devices,

crates/matrix-sdk-crypto/src/store/mod.rs

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -160,56 +160,40 @@ impl KeyQueryManager {
160160

161161
Ok(())
162162
}
163-
}
164-
165-
pub(crate) struct SyncedKeyQueryManager<'a> {
166-
cache: &'a StoreCache,
167-
manager: &'a KeyQueryManager,
168-
}
169-
170-
impl<'a> SyncedKeyQueryManager<'a> {
171-
/// Add entries to the list of users being tracked for device changes
172-
///
173-
/// Any users not already on the list are flagged as awaiting a key query.
174-
/// Users that were already in the list are unaffected.
175-
pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
176-
let mut store_updates = Vec::new();
177-
let mut key_query_lock = self.manager.users_for_key_query.lock().await;
178-
179-
{
180-
let mut tracked_users = self.cache.tracked_users.write().unwrap();
181-
for user_id in users {
182-
if tracked_users.insert(user_id.to_owned()) {
183-
key_query_lock.insert_user(user_id);
184-
store_updates.push((user_id, true))
185-
}
186-
}
187-
}
188-
189-
self.cache.store.save_tracked_users(&store_updates).await
190-
}
191163

192164
/// Wait for a `/keys/query` response to be received if one is expected for
193165
/// the given user.
194166
///
195167
/// If the given timeout elapses, the method will stop waiting and return
196-
/// `UserKeyQueryResult::TimeoutExpired`
168+
/// `UserKeyQueryResult::TimeoutExpired`.
169+
///
170+
/// Requires a [`StoreCacheGuard`] to make sure the users for which a key
171+
/// query is pending are up to date, but doesn't hold on to it
172+
/// thereafter: the lock is short-lived in this case.
197173
pub async fn wait_if_user_key_query_pending(
198174
&self,
175+
cache: StoreCacheGuard,
199176
timeout_duration: Duration,
200177
user: &UserId,
201-
) -> UserKeyQueryResult {
202-
let mut users_for_key_query = self.manager.users_for_key_query.lock().await;
178+
) -> Result<UserKeyQueryResult> {
179+
{
180+
// Drop the cache early, so we don't keep it while waiting (since writing the
181+
// results requires to write in the cache, thus take another lock).
182+
self.ensure_sync_tracked_users(&cache).await?;
183+
drop(cache);
184+
}
185+
186+
let mut users_for_key_query = self.users_for_key_query.lock().await;
203187
let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
204-
return UserKeyQueryResult::WasNotPending;
188+
return Ok(UserKeyQueryResult::WasNotPending);
205189
};
206190

207191
let wait_for_completion = async {
208192
while !waiter.completed.load(Ordering::Relaxed) {
209193
// Register for being notified before releasing the mutex, so
210194
// it's impossible to miss a wakeup between the last check for
211195
// whether we should wait, and starting to wait.
212-
let mut notified = pin!(self.manager.users_for_key_query_notify.notified());
196+
let mut notified = pin!(self.users_for_key_query_notify.notified());
213197
notified.as_mut().enable();
214198
drop(users_for_key_query);
215199

@@ -219,7 +203,7 @@ impl<'a> SyncedKeyQueryManager<'a> {
219203
// Reclaim the lock before checking the flag to avoid races
220204
// when two notifications happen right after each other and the
221205
// second one sets the flag we want to wait for.
222-
users_for_key_query = self.manager.users_for_key_query.lock().await;
206+
users_for_key_query = self.users_for_key_query.lock().await;
223207
}
224208
};
225209

@@ -231,10 +215,38 @@ impl<'a> SyncedKeyQueryManager<'a> {
231215
not finish yet, some devices might be missing."
232216
);
233217

234-
UserKeyQueryResult::TimeoutExpired
218+
Ok(UserKeyQueryResult::TimeoutExpired)
219+
}
220+
_ => Ok(UserKeyQueryResult::WasPending),
221+
}
222+
}
223+
}
224+
225+
pub(crate) struct SyncedKeyQueryManager<'a> {
226+
cache: &'a StoreCache,
227+
manager: &'a KeyQueryManager,
228+
}
229+
230+
impl<'a> SyncedKeyQueryManager<'a> {
231+
/// Add entries to the list of users being tracked for device changes
232+
///
233+
/// Any users not already on the list are flagged as awaiting a key query.
234+
/// Users that were already in the list are unaffected.
235+
pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
236+
let mut store_updates = Vec::new();
237+
let mut key_query_lock = self.manager.users_for_key_query.lock().await;
238+
239+
{
240+
let mut tracked_users = self.cache.tracked_users.write().unwrap();
241+
for user_id in users {
242+
if tracked_users.insert(user_id.to_owned()) {
243+
key_query_lock.insert_user(user_id);
244+
store_updates.push((user_id, true))
245+
}
235246
}
236-
_ => UserKeyQueryResult::WasPending,
237247
}
248+
249+
self.cache.store.save_tracked_users(&store_updates).await
238250
}
239251

240252
/// Process notifications that users have changed devices.

0 commit comments

Comments
 (0)