diff --git a/twilight-gateway/README.md b/twilight-gateway/README.md index 40fd11dedee..1143e9dca0a 100644 --- a/twilight-gateway/README.md +++ b/twilight-gateway/README.md @@ -87,12 +87,6 @@ async fn runner(mut shard: Shard) { let event = match item { Ok(Event::GatewayClose(_)) if SHUTDOWN.load(Ordering::Relaxed) => break, Ok(event) => event, - Err(source) - if SHUTDOWN.load(Ordering::Relaxed) - && matches!(source.kind(), ReceiveMessageErrorType::WebSocket) => - { - break - } Err(source) => { tracing::warn!(?source, "error receiving event"); diff --git a/twilight-gateway/src/error.rs b/twilight-gateway/src/error.rs index eb80efce5a7..7ba2d36e700 100644 --- a/twilight-gateway/src/error.rs +++ b/twilight-gateway/src/error.rs @@ -175,14 +175,6 @@ impl ReceiveMessageError { source: Some(Box::new(source)), } } - - /// Shortcut to create a new error for a websocket error. - pub(crate) fn from_websocket(source: tokio_websockets::Error) -> Self { - Self { - kind: ReceiveMessageErrorType::WebSocket, - source: Some(Box::new(source)), - } - } } impl Display for ReceiveMessageError { @@ -197,7 +189,6 @@ impl Display for ReceiveMessageError { f.write_str(event) } ReceiveMessageErrorType::Reconnect => f.write_str("failed to reconnect to the gateway"), - ReceiveMessageErrorType::WebSocket => f.write_str("websocket connection error"), } } } @@ -228,8 +219,6 @@ pub enum ReceiveMessageErrorType { }, /// Shard failed to reconnect to the gateway. Reconnect, - /// WebSocket connection error. - WebSocket, } #[cfg(test)] @@ -243,7 +232,7 @@ mod tests { #[test] fn receive_message_error_display() { - let messages: [(ReceiveMessageErrorType, &str); 4] = [ + let messages: [(ReceiveMessageErrorType, &str); 3] = [ ( ReceiveMessageErrorType::Compression, "binary message could not be decompressed", @@ -258,7 +247,6 @@ mod tests { ReceiveMessageErrorType::Reconnect, "failed to reconnect to the gateway", ), - (ReceiveMessageErrorType::WebSocket, "websocket connection error"), ]; for (kind, message) in messages { diff --git a/twilight-gateway/src/message.rs b/twilight-gateway/src/message.rs index 67e35fb4267..297eb111a82 100644 --- a/twilight-gateway/src/message.rs +++ b/twilight-gateway/src/message.rs @@ -24,6 +24,9 @@ pub enum Message { } impl Message { + /// Close message indicating the connection was closed abnormally. + pub(crate) const ABNORMAL_CLOSE: Self = Self::Close(Some(CloseFrame::new(1006, ""))); + /// Whether the message is a close message. pub const fn is_close(&self) -> bool { matches!(self, Self::Close(_)) diff --git a/twilight-gateway/src/shard.rs b/twilight-gateway/src/shard.rs index 39f7bde1d0d..08002954442 100644 --- a/twilight-gateway/src/shard.rs +++ b/twilight-gateway/src/shard.rs @@ -437,8 +437,7 @@ impl Shard { /// continue showing the bot as online until its presence times out. /// /// To read all remaining messages, continue calling [`poll_next`] until it - /// returns [`Message::Close`] or a [`ReceiveMessageErrorType::WebSocket`] - /// error type. + /// returns [`Message::Close`]. /// /// # Example /// @@ -457,7 +456,6 @@ impl Shard { /// match item { /// Ok(Message::Close(_)) => break, /// Ok(Message::Text(_)) => unimplemented!(), - /// Err(source) if matches!(source.kind(), ReceiveMessageErrorType::WebSocket) => break, /// Err(source) => unimplemented!(), /// } /// } @@ -550,19 +548,16 @@ impl Shard { } /// Send and flush the pending message. - fn poll_flush_pending( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush_pending(&mut self, cx: &mut Context<'_>) -> Poll> { if self.pending.is_none() { return Poll::Ready(Ok(())); } - ready!(Pin::new(self.connection.as_mut().unwrap()).poll_ready(cx)).map_err(|source| { + if let Err(e) = ready!(Pin::new(self.connection.as_mut().unwrap()).poll_ready(cx)) { self.disconnect(CloseInitiator::Transport); self.connection = None; - ReceiveMessageError::from_websocket(source) - })?; + return Poll::Ready(Err(e)); + } let pending = self.pending.as_mut().unwrap(); @@ -574,18 +569,17 @@ impl Shard { } let ws_message = pending.gateway_event.take().unwrap().into_websocket_msg(); - if let Err(source) = Pin::new(self.connection.as_mut().unwrap()).start_send(ws_message) - { + if let Err(e) = Pin::new(self.connection.as_mut().unwrap()).start_send(ws_message) { self.disconnect(CloseInitiator::Transport); self.connection = None; - return Poll::Ready(Err(ReceiveMessageError::from_websocket(source))); + return Poll::Ready(Err(e)); } } - if let Err(source) = ready!(Pin::new(self.connection.as_mut().unwrap()).poll_flush(cx)) { + if let Err(e) = ready!(Pin::new(self.connection.as_mut().unwrap()).poll_flush(cx)) { self.disconnect(CloseInitiator::Transport); self.connection = None; - return Poll::Ready(Err(ReceiveMessageError::from_websocket(source))); + return Poll::Ready(Err(e)); } if pending.is_heartbeat { @@ -798,7 +792,9 @@ impl Stream for Shard { _ => {} } - ready!(self.poll_flush_pending(cx))?; + if ready!(self.poll_flush_pending(cx)).is_err() { + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); + } if !self.state.is_disconnected() { if let Poll::Ready(frame) = self.user_channel.close_rx.poll_recv(cx) { @@ -807,7 +803,9 @@ impl Stream for Shard { tracing::debug!("sending close frame from user channel"); self.disconnect(CloseInitiator::Shard(frame)); - ready!(self.poll_flush_pending(cx))?; + if ready!(self.poll_flush_pending(cx)).is_err() { + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); + } } } @@ -834,7 +832,9 @@ impl Stream for Shard { self.heartbeat_interval_event = false; } - ready!(self.poll_flush_pending(cx))?; + if ready!(self.poll_flush_pending(cx)).is_err() { + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); + } } let not_ratelimited = self @@ -873,7 +873,9 @@ impl Stream for Shard { ); self.identify_rx = None; - ready!(self.poll_flush_pending(cx))?; + if ready!(self.poll_flush_pending(cx)).is_err() { + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); + } } } @@ -884,7 +886,9 @@ impl Stream for Shard { tracing::debug!("sending command from user channel"); self.pending = Pending::text(command, false); - ready!(self.poll_flush_pending(cx))?; + if ready!(self.poll_flush_pending(cx)).is_err() { + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); + } } } @@ -921,24 +925,18 @@ impl Stream for Shard { { continue } - Some(Err(source)) => { + Some(Err(_)) => { self.disconnect(CloseInitiator::Transport); - - return Poll::Ready(Some(Err(ReceiveMessageError { - kind: ReceiveMessageErrorType::WebSocket, - source: Some(Box::new(source)), - }))); + return Poll::Ready(Some(Ok(Message::ABNORMAL_CLOSE))); } None => { - let res = ready!(Pin::new(self.connection.as_mut().unwrap()).poll_close(cx)); + _ = ready!(Pin::new(self.connection.as_mut().unwrap()).poll_close(cx)); tracing::debug!("gateway WebSocket connection closed"); // Unclean closure. if !self.state.is_disconnected() { self.disconnect(CloseInitiator::Transport); } self.connection = None; - - res.map_err(ReceiveMessageError::from_websocket)?; } } };