From 69e703e6e6c790df3ea8206b82674598415747ee Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Wed, 10 Jan 2024 02:45:49 +0000 Subject: [PATCH] Merge AsyncUserSocks into AsyncUsers, make sockid AtomicUsize Revise authentication logic to use constant time string comparison Greatly reduce write locks on users hashmap, particularly when authenticating messages attempt to optimistically load with a read lock --- src/rs/server/src/handleget.rs | 2 +- src/rs/server/src/handlews.rs | 128 +++++++++++---------------------- src/rs/server/src/main.rs | 9 +-- src/rs/server/src/users.rs | 120 ++++++++++++++++++++++++++----- 4 files changed, 148 insertions(+), 111 deletions(-) diff --git a/src/rs/server/src/handleget.rs b/src/rs/server/src/handleget.rs index ce37a2dc..97b216b2 100644 --- a/src/rs/server/src/handleget.rs +++ b/src/rs/server/src/handleget.rs @@ -387,7 +387,7 @@ async fn handle_get_core( } else if path.starts_with("/collection/") { let name = &path["/collection/".len()..]; if let Ok(client) = pgpool.get().await { - if let Some(user) = users.write().await.load(&*client, name).await { + if let Some(user) = users.write().await.load(&*client, name, None).await { let user = user.lock().await; let pool = &user.data.pool; let bound = &user.data.accountbound; diff --git a/src/rs/server/src/handlews.rs b/src/rs/server/src/handlews.rs index 94070a7d..814ac4b6 100644 --- a/src/rs/server/src/handlews.rs +++ b/src/rs/server/src/handlews.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::num::NonZeroUsize; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -35,7 +36,7 @@ use crate::starters::{ORIGINAL_STARTERS, STARTERS}; use crate::users::{self, HashAlgo, UserData, UserObject, UserRole, Users}; use crate::{get_day, PgPool, WsStream}; -static NEXT_SOCK_ID: AtomicUsize = AtomicUsize::new(0); +static NEXT_SOCK_ID: AtomicUsize = AtomicUsize::new(1); const SELL_VALUES: [u8; 5] = [5, 1, 3, 15, 150]; @@ -73,8 +74,7 @@ pub struct Sock { } pub type AsyncUsers = RwLock; -pub type AsyncSocks = RwLock>; -pub type AsyncUserSocks = RwLock>; +pub type AsyncSocks = RwLock>; fn sendmsg(tx: &WsSender, val: &T) where @@ -144,14 +144,7 @@ where client.execute("with arank as (select user_id, arena_id, \"rank\", (row_number() over (partition by arena_id order by score desc, day desc, \"rank\"))::int realrank from arena) update arena set \"rank\" = realrank, bestrank = least(bestrank, realrank) from arank where arank.arena_id = arena.arena_id and arank.user_id = arena.user_id and arank.realrank <> arank.\"rank\"", &[]) } -async fn login_success( - usersocks: &AsyncUserSocks, - tx: &WsSender, - sockid: usize, - user: &mut UserObject, - username: &str, - client: &mut Client, -) { +async fn login_success(tx: &WsSender, user: &mut UserObject, username: &str, client: &mut Client) { if user.id != -1 { let today = get_day(); let oracle = user.data.oracle; @@ -181,9 +174,7 @@ async fn login_success( } if let Ok(userstr) = serde_json::to_string(&WsResponse::login(&*user)) { - if tx.send(Message::Text(userstr)).is_ok() { - usersocks.write().await.insert(String::from(username), sockid); - } + tx.send(Message::Text(userstr)).ok(); } if user.data.daily == 0 { @@ -325,11 +316,10 @@ pub async fn handle_ws( ws: WsStream, pgpool: &PgPool, users: &AsyncUsers, - usersocks: &AsyncUserSocks, socks: &AsyncSocks, tls: &TlsConnector, ) { - let sockid = NEXT_SOCK_ID.fetch_add(1, Ordering::Relaxed); + let Some(sockid) = NonZeroUsize::new(NEXT_SOCK_ID.fetch_add(1, Ordering::Relaxed)) else { return }; let (mut user_ws_tx, mut user_ws_rx) = ws.split(); let (tx, rx) = mpsc::unbounded_channel(); @@ -345,27 +335,13 @@ pub async fn handle_ws( 'msgloop: while let Some(Ok(result)) = user_ws_rx.next().await { let Message::Text(msg) = result else { continue }; if let Ok(msg) = serde_json::from_str::(&msg) { - let mut client = pgpool.get().await.expect("Failed to acquire sql connection"); + let Ok(mut client) = pgpool.get().await else { continue }; match msg { UserMessage::a { u, a, msg } => { - let (user, userid) = if let Ok(row) = - client.query_one("select id, auth from users where name = $1", &[&u]).await - { - if a == row.get::(1) { - if let Some(user) = users.write().await.load(&*client, &u).await { - let rightsock = usersocks.read().await.get(&u) == Some(&sockid); - if !rightsock { - usersocks.write().await.insert(u.clone(), sockid); - } - (user, row.get::(0)) - } else { - continue; - } - } else { - continue; - } - } else { + let Some((user, userid)) = + Users::load_with_auth(users, &*client, &u, &a, sockid).await + else { continue; }; match msg { @@ -378,9 +354,8 @@ pub async fn handle_ws( AuthMessage::modresetpass { m } => { if u == "serprex" { let mut wusers = users.write().await; - if let Some(user) = wusers.load(&*client, &m).await { - let mut user = user.lock().await; - user.auth = String::new(); + if let Some(user) = wusers.load(&*client, &m, None).await { + user.lock().await.auth.clear(); } } } @@ -478,7 +453,7 @@ pub async fn handle_ws( decks.insert(String::from("2"), String::from(sid.2)); decks.insert(String::from("3"), String::from(sid.3)); user.data.decks = decks; - login_success(&usersocks, &tx, sockid, &mut *user, &u, &mut client) + login_success(&tx, &mut *user, &u, &mut client) .await; } } @@ -514,11 +489,7 @@ pub async fn handle_ws( } } AuthMessage::logout => { - let mut wusers = users.write().await; - let mut wusersocks = usersocks.write().await; - wusersocks.remove(&u); - drop(wusersocks); - wusers.evict(&client, &u).await; + users.write().await.evict(&client, &u).await; } AuthMessage::delete => { let params: &[&(dyn ToSql + Sync)] = &[&userid]; @@ -535,10 +506,7 @@ pub async fn handle_ws( .is_ok() { trx.commit().await.ok(); - let mut wusers = users.write().await; - let mut wusersocks = usersocks.write().await; - wusersocks.remove(&u); - wusers.remove(&u); + users.write().await.remove(&u); } } } @@ -630,7 +598,7 @@ pub async fn handle_ws( AuthMessage::modarena { aname, won, lv } => { let mut wusers = users.write().await; if let Some(auserid) = - if let Some(other) = wusers.load(&*client, &aname).await { + if let Some(other) = wusers.load(&*client, &aname, None).await { let mut other = other.lock().await; other.data.gold += if won { 15 } else { 5 }; Some(other.id) @@ -709,7 +677,7 @@ pub async fn handle_ws( } AuthMessage::setgold { t, g } => { if role_check(UserRole::Codesmith, &tx, &client, userid).await { - if let Some(tgt) = users.write().await.load(&*client, &t).await { + if let Some(tgt) = users.write().await.load(&*client, &t, None).await { let mut tgt = tgt.lock().await; sendmsg( &tx, @@ -727,7 +695,7 @@ pub async fn handle_ws( } AuthMessage::addpool { t, pool, bound } => { if role_check(UserRole::Codesmith, &tx, &client, userid).await { - if let Some(tgt) = users.write().await.load(&*client, &t).await { + if let Some(tgt) = users.write().await.load(&*client, &t, None).await { let mut tgt = tgt.lock().await; let curpool = if bound { &mut tgt.data.accountbound @@ -1030,10 +998,8 @@ pub async fn handle_ws( }, ) }; - if let Some(foesockid) = usersocks.read().await.get(&f) { + if let Some((foesockid, foeuser)) = users.read().await.get(&f) { if let Some(foesock) = socks.read().await.get(&foesockid) { - let mut wusers = users.write().await; - if let Some(foeuser) = wusers.load(&*client, &f).await { let foeuserid = foeuser.lock().await.id; if let Ok(trx) = client.transaction().await { trx.execute("delete from match_request mr1 where user_id = $1 and accepted", &[&userid]).await.ok(); @@ -1107,7 +1073,6 @@ pub async fn handle_ws( } } } - } } } AuthMessage::r#move { @@ -1117,7 +1082,7 @@ pub async fn handle_ws( cmd, } => { if let Ok(trx) = client.transaction().await { - if let (Ok(moves), Ok(users)) = ( + if let (Ok(moves), Ok(urows)) = ( trx.query_one( "select g.moves from games g join match_request mr on mr.game_id = g.id join users u on u.id = mr.user_id where g.id = $1 and u.id = $2 for update", &[&id, &userid]).await, @@ -1125,7 +1090,7 @@ pub async fn handle_ws( "select u.id, u.name from match_request mr join users u on mr.user_id = u.id where mr.game_id = $1", &[&id]).await, ) { - if users.iter().all(|row| row.get::(0) != userid) { + if urows.iter().all(|row| row.get::(0) != userid) { sendmsg(&tx, &WsResponse::chat { mode: 1, msg: "You aren't in that match", @@ -1142,13 +1107,13 @@ pub async fn handle_ws( "update games set moves = array_append(moves, $2), expire_at = now() + interval '1 hour' where id = $1", &[&id, &Json(GamesMove { cmd, hash })]).await.is_ok() && trx.commit().await.is_ok() { if let Ok(movejson) = serde_json::to_string(&WsResponse::r#move { cmd, hash }) { - let rusersocks = usersocks.read().await; + let rusers = users.read().await; let rsocks = socks.read().await; - for row in users.iter() { + for row in urows.iter() { let uid: i64 = row.get(0); if uid != userid { let name: &str = row.get(1); - if let Some(sockid) = rusersocks.get(name) { + if let Some(ref sockid) = rusers.get_sockid(name) { if let Some(sock) = rsocks.get(sockid) { sock.tx.send(Message::Text(movejson.clone())).ok(); } @@ -1226,10 +1191,8 @@ pub async fn handle_ws( } AuthMessage::canceltrade { f } => { if u != f { - if let Some(foesockid) = usersocks.read().await.get(&f) { + if let Some((foesockid, foeuser)) = users.read().await.get(&f) { if let Some(foesock) = socks.read().await.get(&foesockid) { - let mut wusers = users.write().await; - if let Some(foeuser) = wusers.load(&*client, &f).await { sendmsg( &foesock.tx, &WsResponse::tradecanceled { u: &u }, @@ -1247,14 +1210,12 @@ pub async fn handle_ws( "delete from trade_request where (user_id = $1 and for_user_id = $2) or (user_id = $2 and for_user_id = $1)", &[&userid, &foeuserid]).await.ok(); } - } } } } AuthMessage::reloadtrade { f } => { if u != f { - let mut wusers = users.write().await; - if let Some(foeuser) = wusers.load(&*client, &f).await { + if let Some((_, foeuser)) = users.read().await.get(&f) { let foeuserid = foeuser.lock().await.id; if let Ok(trade) = client.query_one( "select cards, g from trade_request where user_id = $2 and for_user_id = $1", &[&userid, &foeuserid]).await { @@ -1277,10 +1238,8 @@ pub async fn handle_ws( g, } => { if u != f { - if let Some(foesockid) = usersocks.read().await.get(&f) { + if let Some((foesockid, foeuser)) = users.read().await.get(&f) { if let Some(foesock) = socks.read().await.get(&foesockid) { - let mut wusers = users.write().await; - if let Some(foeuser) = wusers.load(&*client, &f).await { let (mut user, mut foeuser) = ordered_lock(&user, &foeuser).await; if let Ok(trx) = client.transaction().await { @@ -1394,7 +1353,6 @@ pub async fn handle_ws( }); } } - } } } } @@ -1418,7 +1376,7 @@ pub async fn handle_ws( sendmsg(&tx, &WsResponse::passchange { auth: &user.auth }); } AuthMessage::challrecv { f, trade } => { - if let Some(foesockid) = usersocks.read().await.get(&f) { + if let Some(foesockid) = users.read().await.get_sockid(&f) { if let Some(foesock) = socks.read().await.get(&foesockid) { sendmsg( &foesock.tx, @@ -1438,7 +1396,7 @@ pub async fn handle_ws( AuthMessage::chat { to, msg } => { if let Some(to) = to { let mut sent = false; - if let Some(tosockid) = usersocks.read().await.get(&to) { + if let Some(tosockid) = users.read().await.get_sockid(&to) { if let Some(sock) = socks.read().await.get(&tosockid) { if serde_json::to_string(&WsResponse::chatu { mode: 2, @@ -1774,11 +1732,10 @@ pub async fn handle_ws( ); drop(user); let mut wusers = users.write().await; - let rusersocks = usersocks.read().await; let rsocks = socks.read().await; for sell in sells { { - if let Some(seller) = wusers.load(&trx, &sell.u).await { + if let Some(seller) = wusers.load(&trx, &sell.u, None).await { let mut seller = seller.lock().await; if sell.p > 0 { let c = seller @@ -1806,8 +1763,7 @@ pub async fn handle_ws( if let Some(selltx) = if sell.u == u { Some(tx.clone()) } else { - rusersocks - .get(&sell.u) + wusers.get_sockid(&sell.u) .and_then(|sockid| rsocks.get(&sockid)) .map(|sock| sock.tx.clone()) } { @@ -2164,7 +2120,7 @@ pub async fn handle_ws( ); } else { let mut wusers = users.write().await; - let user = if let Some(user) = wusers.load(&*client, &u).await { + let user = if let Some(user) = wusers.load(&*client, &u, Some(sockid)).await { user } else { let user = Arc::new(Mutex::new(UserObject { @@ -2176,7 +2132,7 @@ pub async fn handle_ws( algo: users::HASH_ALGO, data: UserData { oracle: u32::MAX, ..Default::default() }, })); - wusers.insert(u.clone(), user.clone()); + wusers.insert(u.clone(), sockid.get(), user.clone()); user }; let mut user = user.lock().await; @@ -2211,7 +2167,7 @@ pub async fn handle_ws( } else { user.auth.is_empty() } { - login_success(&usersocks, &tx, sockid, &mut *user, &u, &mut client).await; + login_success(&tx, &mut *user, &u, &mut client).await; } else { sendmsg(&tx, &WsResponse::loginfail { err: "Authentication failed" }); } @@ -2238,7 +2194,6 @@ pub async fn handle_ws( (1..output.len()).into_iter().rev().find(|&idx| { output[idx - 1] == b'\n' && output[idx] == b'{' }) { - println!("{}", String::from_utf8_lossy(&output)); if let Ok(Value::Object(body)) = serde_json::from_slice::(&output[pos..]) { @@ -2255,14 +2210,12 @@ pub async fn handle_ws( ); let mut wusers = users.write().await; if let Some(user) = - wusers.load(&*client, &name).await + wusers.load(&*client, &name, Some(sockid)).await { let mut user = user.lock().await; user.auth = g.clone(); login_success( - &usersocks, &tx, - sockid, &mut user, &name, &mut client, @@ -2279,9 +2232,7 @@ pub async fn handle_ws( data: Default::default(), }; login_success( - &usersocks, &tx, - sockid, &mut newuser, &name, &mut client, @@ -2289,6 +2240,7 @@ pub async fn handle_ws( .await; wusers.insert( name, + sockid.get(), Arc::new(Mutex::new(newuser)), ); } @@ -2389,7 +2341,7 @@ pub async fn handle_ws( } } UserMessage::librarywant { f } => { - if let Some(user) = users.write().await.load(&*client, &f).await { + if let Some(user) = users.write().await.load(&*client, &f, None).await { let user = user.lock().await; let mut gold = user.data.gold; let mut pool = user.data.pool.clone(); @@ -2476,10 +2428,10 @@ pub async fn handle_ws( UserMessage::who => { let mut res = String::new(); { - let rusersocks = usersocks.read().await; + let rusers = users.read().await; let rsocks = socks.read().await; - for (name, id) in rusersocks.iter() { - if let Some(sock) = rsocks.get(id) { + for (name, id) in rusers.iter_name_sockid() { + if let Some(sock) = rsocks.get(&id) { if !sock.hide { if !res.is_empty() { res.push_str(", "); diff --git a/src/rs/server/src/main.rs b/src/rs/server/src/main.rs index c46fd935..0c4e17c1 100644 --- a/src/rs/server/src/main.rs +++ b/src/rs/server/src/main.rs @@ -34,7 +34,7 @@ use tokio_rustls::{ use bb8_postgres::{bb8::Pool, tokio_postgres, PostgresConnectionManager}; use crate::handleget::AsyncCache; -use crate::handlews::{AsyncSocks, AsyncUserSocks, AsyncUsers}; +use crate::handlews::{AsyncSocks, AsyncUsers}; pub type PgPool = Pool>; pub type WsStream = WebSocketStream>; @@ -78,7 +78,6 @@ type Error = Box; struct Server { pub users: AsyncUsers, - pub usersocks: AsyncUserSocks, pub socks: AsyncSocks, pub cache: AsyncCache, pub pgpool: PgPool, @@ -103,7 +102,6 @@ impl hyper::service::Service> for ServerService { ws, &server.pgpool, &server.users, - &server.usersocks, &server.socks, &server.tls, ) @@ -160,7 +158,6 @@ async fn main() { let mut gccloserx = closerx.clone(); let users = AsyncUsers::default(); - let usersocks = AsyncUserSocks::default(); let socks = AsyncSocks::default(); let cache = AsyncCache::default(); let tlsconfig = ClientConfig::builder() @@ -169,7 +166,7 @@ async fn main() { }) .with_no_client_auth(); let tls = TlsConnector::from(Arc::new(tlsconfig)); - let server = Arc::new(Server { pgpool, users, usersocks, socks, cache, tls }); + let server = Arc::new(Server { pgpool, users, socks, cache, tls }); let gc = server.clone(); let mut interval = tokio::time::interval(Duration::new(300, 0)); @@ -189,7 +186,7 @@ async fn main() { if let Ok(client) = gc.pgpool.get().await { let mut users = gc.users.write().await; let _ = tokio::join!(users - .store(&client, &gc.usersocks, &gc.socks), + .store(&client, &gc.socks), client.execute("delete from trade_request where expire_at < now()", &[]), client.execute( "with expiredids (id) as (select id from games where expire_at < now()) \ diff --git a/src/rs/server/src/users.rs b/src/rs/server/src/users.rs index 423c5e8a..712939dd 100644 --- a/src/rs/server/src/users.rs +++ b/src/rs/server/src/users.rs @@ -1,15 +1,17 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use bb8_postgres::tokio_postgres::types::{FromSql, ToSql}; use bb8_postgres::tokio_postgres::{types::Json, Client, GenericClient}; +use ring::constant_time::verify_slices_are_equal; use ring::pbkdf2; use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use crate::cardpool::Cardpool; -use crate::handlews::{AsyncSocks, AsyncUserSocks}; +use crate::handlews::AsyncSocks; #[derive(Clone, Copy, Debug, ToSql, FromSql)] #[postgres(name = "userrole")] @@ -115,15 +117,83 @@ impl UserObject { pub type User = Arc>; #[derive(Default)] -pub struct Users(HashMap); +pub struct Users(HashMap); impl Users { - pub async fn load(&mut self, client: &GC, name: &str) -> Option + pub async fn load_with_auth( + users: &RwLock, + client: &GC, + name: &str, + auth: &str, + sockid: NonZeroUsize, + ) -> Option<(User, i64)> where GC: GenericClient, { - if let Some((ref gc, ref user)) = self.0.get(name) { + let ruserslock = users.read().await; + let Users(ref rusers) = *ruserslock; + if let Some((ref gc, ref usersockid, ref u)) = rusers.get(name) { + let user = u.lock().await; + return if verify_slices_are_equal(user.auth.as_bytes(), auth.as_bytes()) == Ok(()) { + gc.store(false, Ordering::Release); + usersockid.store(sockid.get(), Ordering::Release); + Some((Arc::clone(u), user.id)) + } else { + None + }; + } + drop(ruserslock); + + // split up loading user data to reduce processing time of spamming failed logins + if let Ok(row) = + client.query_one("select id, auth, salt, iter, algo from users where name = $1", &[&name]).await + { + let userauth = row.get::(1); + if verify_slices_are_equal(userauth.as_bytes(), auth.as_bytes()) == Ok(()) { + let userid = row.get::(0); + let mut wuserslock = users.write().await; + let Users(ref mut wusers) = *wuserslock; + if let Ok(urow) = client + .query_one("select data from user_data where user_id = $1 and type_id = 1", &[&userid]) + .await + { + let Ok(Json(userdata)) = urow.try_get::>(0) else { + panic!("Invalid json for user {}", name); + }; + let userarc = Arc::new(Mutex::new(UserObject { + name: String::from(name), + id: row.get::(0), + auth: row.get::(1), + salt: row.get::>(2), + iter: row.get::(3) as u32, + algo: row.get::(4), + data: userdata, + })); + wusers.insert( + String::from(name), + (AtomicBool::new(false), AtomicUsize::new(sockid.get()), userarc.clone()), + ); + Some((userarc, userid)) + } else { + None + } + } else { + None + } + } else { + None + } + } + + pub async fn load(&mut self, client: &GC, name: &str, sockid: Option) -> Option + where + GC: GenericClient, + { + if let Some((ref gc, ref usersockid, ref user)) = self.0.get(name) { gc.store(false, Ordering::Release); + if let Some(id) = sockid { + usersockid.store(id.get(), Ordering::Release); + } Some(user.clone()) } else if let Some(row) = client.query_opt("select u.id, u.auth, u.salt, u.iter, u.algo, ud.data from user_data ud join users u on u.id = ud.user_id where u.name = $1 and ud.type_id = 1", &[&name]).await.expect("Connection failed while loading user") { let Json(userdata) = row.try_get::>(5).expect("Invalid json for user"); @@ -137,15 +207,31 @@ impl Users { algo: row.get::(4), data: userdata, })); - self.insert(namestr, userarc.clone()); + self.insert(namestr, if let Some(id) = sockid { id.get() } else { 0 }, userarc.clone()); Some(userarc) } else { None } } - pub fn insert(&mut self, name: String, user: User) { - self.0.insert(name, (AtomicBool::new(false), user)); + pub fn get(&self, name: &str) -> Option<(NonZeroUsize, User)> { + self.0.get(name).and_then(|(_, sockid, user)| { + NonZeroUsize::new(sockid.load(Ordering::Acquire)).map(|sockid| (sockid, user.clone())) + }) + } + + pub fn get_sockid(&self, name: &str) -> Option { + self.0.get(name).and_then(|(_, sockid, _)| NonZeroUsize::new(sockid.load(Ordering::Acquire))) + } + + pub fn iter_name_sockid(&self) -> impl Iterator { + return self.0.iter().filter_map(|(name, (_, sockid, _))| { + NonZeroUsize::new(sockid.load(Ordering::Acquire)).map(|sockid| (name.as_str(), sockid)) + }); + } + + pub fn insert(&mut self, name: String, sockid: usize, user: User) { + self.0.insert(name, (AtomicBool::new(false), AtomicUsize::new(sockid), user)); } pub fn remove(&mut self, name: &str) { @@ -153,7 +239,7 @@ impl Users { } pub async fn evict(&mut self, client: &Client, name: &str) { - if let Some((_, user)) = self.0.remove(name) { + if let Some((_, _, user)) = self.0.remove(name) { let user = user.lock().await; client .query( @@ -167,7 +253,7 @@ impl Users { pub async fn saveall(&self, client: &Client) -> bool { let mut queries = Vec::new(); - for &(_, ref user) in self.0.values() { + for &(_, _, ref user) in self.0.values() { queries.push(async move { let user = user.lock().await; client @@ -181,12 +267,14 @@ impl Users { futures::future::join_all(queries).await.into_iter().all(|x| x.is_ok()) } - pub async fn store(&mut self, client: &Client, usersocks: &AsyncUserSocks, socks: &AsyncSocks) { + pub async fn store(&mut self, client: &Client, socks: &AsyncSocks) { if self.saveall(client).await { - let mut usersocks = usersocks.write().await; - let socks = socks.read().await; - usersocks.retain(|_, v| socks.contains_key(v)); - self.0.retain(|_, (ref gc, _)| gc.swap(false, Ordering::AcqRel)); + let rsocks = socks.read().await; + self.0.retain(|_, (ref gc, ref sockid, _)| { + NonZeroUsize::new(sockid.load(Ordering::Acquire)) + .map(|id| rsocks.contains_key(&id)) + .unwrap_or(false) || gc.swap(false, Ordering::AcqRel) + }); } } }