diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs index bc8ca8b2a4..65bf6f31e5 100644 --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -7,7 +7,8 @@ use futures_util::StreamExt; use hyper_tungstenite::{tungstenite::Message, WebSocketStream}; use lapin::message::Delivery; use lapin::options::{ - BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions, + BasicCancelOptions, BasicConsumeOptions, BasicPublishOptions, + QueueDeclareOptions, QueueDeleteOptions, }; use lapin::types::FieldTable; use lapin::BasicProperties; @@ -242,10 +243,32 @@ impl WebsocketSession { } } - // Release websocket and remove from active connections + // Release WebSocket and remove from active connections pub async fn close(&mut self) { if let Err(e) = self.tx.close().await { - debug!("Failed to close session: {}", e); + debug!("Failed to close WebSocket session: {}", e); + } + + if let Err(e) = self + .amqp_channel + .basic_cancel( + self.amqp_consumer.tag().as_str(), + BasicCancelOptions::default(), + ) + .await + { + error!("Failed to cancel consumer: {}", e); + } + + if let Err(e) = self + .amqp_channel + .queue_delete( + self.device_info.device_id.as_str(), + QueueDeleteOptions::default(), + ) + .await + { + error!("Failed to delete queue: {}", e); } } }