From 28fae411ccc8185ed9a3939d5d80c47ee8f45308 Mon Sep 17 00:00:00 2001 From: Jai A Date: Fri, 27 Dec 2024 16:56:05 -0700 Subject: [PATCH] Fix sockets causing actix hangs --- apps/labrinth/src/routes/internal/statuses.rs | 32 +++++++++++-------- apps/labrinth/src/routes/v3/friends.rs | 18 ++++++----- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index b54af1869..3ac6ee923 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -10,9 +10,9 @@ use crate::queue::socket::ActiveSockets; use crate::routes::ApiError; use actix_web::web::{Data, Payload}; use actix_web::{get, web, HttpRequest, HttpResponse}; -use actix_ws::AggregatedMessage; +use actix_ws::Message; use chrono::Utc; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -128,13 +128,13 @@ pub async fn ws_init( ) .await?; - let mut stream = msg_stream.aggregate_continuations(); + let mut stream = msg_stream.into_stream(); actix_web::rt::spawn(async move { // receive messages from websocket while let Some(msg) = stream.next().await { match msg { - Ok(AggregatedMessage::Text(text)) => { + Ok(Message::Text(text)) => { if let Ok(message) = serde_json::from_str::(&text) { @@ -159,10 +159,14 @@ pub async fn ws_init( status.profile_name = profile_name; status.last_update = Utc::now(); + let user_status = status.clone(); + // We drop the pair to avoid holding the lock for too long + drop(pair); + let _ = broadcast_friends( user.id, ServerToClientMessage::StatusUpdate { - status: status.clone(), + status: user_status, }, &pool, &db, @@ -175,15 +179,14 @@ pub async fn ws_init( } } - Ok(AggregatedMessage::Close(_)) => { + Ok(Message::Close(_)) => { let _ = close_socket(user.id, &pool, &db).await; } - Ok(AggregatedMessage::Ping(msg)) => { - if let Some(mut socket) = db.auth_sockets.get_mut(&user.id) - { - let (_, socket) = socket.value_mut(); - let _ = socket.pong(&msg).await; + Ok(Message::Ping(msg)) => { + if let Some(mut socket) = db.auth_sockets.get(&user.id) { + let (_, socket) = socket.value(); + let _ = socket.clone().pong(&msg).await; } } @@ -219,11 +222,12 @@ pub async fn broadcast_friends( if friend.accepted { if let Some(mut socket) = - sockets.auth_sockets.get_mut(&friend_id.into()) + sockets.auth_sockets.get(&friend_id.into()) { - let (_, socket) = socket.value_mut(); + let (_, socket) = socket.value(); - let _ = socket.text(serde_json::to_string(&message)?).await; + let _ = + socket.clone().text(serde_json::to_string(&message)?).await; } } } diff --git a/apps/labrinth/src/routes/v3/friends.rs b/apps/labrinth/src/routes/v3/friends.rs index 9541c3dd5..f7e5a96ef 100644 --- a/apps/labrinth/src/routes/v3/friends.rs +++ b/apps/labrinth/src/routes/v3/friends.rs @@ -78,12 +78,13 @@ pub async fn add_friend( ) -> Result<(), ApiError> { if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) { let (friend_status, _) = pair.value(); - if let Some(mut socket) = - sockets.auth_sockets.get_mut(&friend_id.into()) + if let Some(socket) = + sockets.auth_sockets.get(&friend_id.into()) { - let (_, socket) = socket.value_mut(); + let (_, socket) = socket.value(); let _ = socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::StatusUpdate { status: friend_status.clone(), @@ -120,11 +121,11 @@ pub async fn add_friend( .insert(&mut transaction) .await?; - if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) - { - let (_, socket) = socket.value_mut(); + if let Some(socket) = db.auth_sockets.get(&friend.id.into()) { + let (_, socket) = socket.value(); if socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::FriendRequest { from: user.id }, )?) @@ -177,10 +178,11 @@ pub async fn remove_friend( ) .await?; - if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) { - let (_, socket) = socket.value_mut(); + if let Some(mut socket) = db.auth_sockets.get(&friend.id.into()) { + let (_, socket) = socket.value(); let _ = socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::FriendRequestRejected { from: user.id,