From 2154c9b80b0692e4fac9590b44c5ab7877558b26 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 26 Jul 2024 14:15:38 -0700 Subject: [PATCH] make out-of-order non-fatal (#56) Why === _Describe what prompted you to make this change, link relevant resources: Linear issues, Slack discussions, etc._ - same as https://github.com/replit/river/pull/244 What changed ============ - no longer raise protocol error on out of order and instead close the connection - the reasoning is that the client will attempt to re-establish the connection and re-agree on the right seq number and resend the send buffer - if theres a disagreement at this stage, we will kill the session but otherwise we can actually transparently recover! Test plan ========= _Describe what you did to test this change to a level of detail that allows your reviewer to test it_ --- replit_river/seq_manager.py | 18 ++++++++++++++---- replit_river/session.py | 5 +++++ tests/test_seq_manager.py | 4 ++-- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/replit_river/seq_manager.py b/replit_river/seq_manager.py index f1a8058..fa75b44 100644 --- a/replit_river/seq_manager.py +++ b/replit_river/seq_manager.py @@ -18,6 +18,15 @@ class InvalidMessageException(Exception): pass +class OutOfOrderMessageException(Exception): + """Error when a message is received out of order, we close the connection + and wait for the client to resychronize. If the resychronization fails, + we close the session. + """ + + pass + + class SessionStateMismatchException(Exception): """Error when the session state mismatch, we reject handshake and close the connection""" @@ -68,13 +77,14 @@ async def check_seq_and_update(self, msg: TransportMessage) -> None: f" expected {self.ack}" ) else: - logger.error( + logger.warn( f"Out of order message received got {msg.seq} expected " f"{self.ack}" ) - raise InvalidMessageException( - f"{msg.from_} received out of order, got {msg.seq}" - f" expected {self.ack}" + + raise OutOfOrderMessageException( + f"Out of order message received got {msg.seq} expected " + f"{self.ack}" ) self.receiver_ack = msg.ack await self._set_ack(msg.seq + 1) diff --git a/replit_river/session.py b/replit_river/session.py index 1408bd9..d96a363 100644 --- a/replit_river/session.py +++ b/replit_river/session.py @@ -18,6 +18,7 @@ from replit_river.seq_manager import ( IgnoreMessageException, InvalidMessageException, + OutOfOrderMessageException, SeqManager, ) from replit_river.task_manager import BackgroundTaskManager @@ -200,6 +201,10 @@ async def _handle_messages_from_ws( except IgnoreMessageException as e: logger.debug("Ignoring transport message : %r", e) continue + except OutOfOrderMessageException as e: + logger.error(f"Out of order message, closing connection : {e}") + await ws_wrapper.close() + return except InvalidMessageException as e: logger.error( f"Got invalid transport message, closing session : {e}" diff --git a/tests/test_seq_manager.py b/tests/test_seq_manager.py index 006a950..748f4fc 100644 --- a/tests/test_seq_manager.py +++ b/tests/test_seq_manager.py @@ -4,7 +4,7 @@ from replit_river.seq_manager import ( IgnoreMessageException, - InvalidMessageException, + OutOfOrderMessageException, SeqManager, ) from tests.conftest import transport_message @@ -47,7 +47,7 @@ async def test_message_reception(no_logging_error: NoErrors) -> None: # Test out of order message msg.seq = 2 - with pytest.raises(InvalidMessageException): + with pytest.raises(OutOfOrderMessageException): await manager.check_seq_and_update(msg)