Skip to content

Commit

Permalink
[Tunnelbroker] close RabbitMQ queue when client disconnects
Browse files Browse the repository at this point in the history
Summary:
This might cause issues and was causing error in past:
1. We run persistence test
2. Queue is created, but never closed
3. We run tests again, messages are in database, but are also put into RabbitMQ
4. Consumer receives messages, and try to send but socket is closed at this time
5. It generates error in Tunnelbroker logs

Depends on D9316

Test Plan:
1. Without this changes run:
`cd services/commtest && cargo test --test tunnelbroker_persist_tests  -- --nocapture`
2. Wait for the test to finish
3. Go to RabbitMQ console and it shows queues:
{F780925}
4. Restart RabbitMQ
5. Do steps 1, 2, 3 and the console should show 0 queues:
{F780926}

Reviewers: michal, bartek, varun, jon

Reviewed By: michal, bartek

Subscribers: ashoat, tomek

Differential Revision: https://phab.comm.dev/D9317
  • Loading branch information
xsanm committed Oct 12, 2023
1 parent 357528d commit 67bce29
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions services/tunnelbroker/src/websockets/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use futures_util::StreamExt;
use hyper_tungstenite::{tungstenite::Message, WebSocketStream};
use lapin::message::Delivery;
use lapin::options::{
BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
BasicCancelOptions, BasicConsumeOptions, BasicPublishOptions,
QueueDeclareOptions, QueueDeleteOptions,
};
use lapin::types::FieldTable;
use lapin::BasicProperties;
Expand Down Expand Up @@ -242,10 +243,32 @@ impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
}
}

// Release websocket and remove from active connections
// Release WebSocket and remove from active connections
pub async fn close(&mut self) {
if let Err(e) = self.tx.close().await {
debug!("Failed to close session: {}", e);
debug!("Failed to close WebSocket session: {}", e);
}

if let Err(e) = self
.amqp_channel
.basic_cancel(
self.amqp_consumer.tag().as_str(),
BasicCancelOptions::default(),
)
.await
{
error!("Failed to cancel consumer: {}", e);
}

if let Err(e) = self
.amqp_channel
.queue_delete(
self.device_info.device_id.as_str(),
QueueDeleteOptions::default(),
)
.await
{
error!("Failed to delete queue: {}", e);
}
}
}

0 comments on commit 67bce29

Please sign in to comment.