diff --git a/replit_river/client_transport.py b/replit_river/client_transport.py index f55d33e..1c00b0c 100644 --- a/replit_river/client_transport.py +++ b/replit_river/client_transport.py @@ -104,12 +104,16 @@ async def _establish_new_connection( client_id = self._client_id logger.info("Attempting to establish new ws connection") + last_error: Optional[Exception] = None for i in range(max_retry): if i > 0: logger.info(f"Retrying build handshake number {i} times") if not rate_limit.has_budget(client_id): logger.debug("No retry budget for %s.", client_id) - break + raise RiverException( + ERROR_HANDSHAKE, f"No retry budget for {client_id}" + ) from last_error + rate_limit.consume_budget(client_id) # if the session is closed, we shouldn't use it @@ -118,6 +122,7 @@ async def _establish_new_connection( try: websocket_uri = await self._websocket_uri_factory() + handshake_metadata = await self._handshake_metadata_factory() ws = await websockets.connect(websocket_uri) session_id = ( self.generate_session_id() @@ -125,8 +130,6 @@ async def _establish_new_connection( else old_session.session_id ) - handshake_metadata = await self._handshake_metadata_factory() - try: ( handshake_request, @@ -144,16 +147,18 @@ async def _establish_new_connection( except RiverException as e: await ws.close() raise e - except Exception: + except Exception as e: + last_error = e backoff_time = rate_limit.get_backoff_ms(client_id) logger.exception( f"Error connecting, retrying with {backoff_time}ms backoff" ) await asyncio.sleep(backoff_time / 1000) + raise RiverException( ERROR_HANDSHAKE, - "Failed to create ws after retrying max number of times", - ) + f"Failed to create ws after retrying {max_retry} number of times", + ) from last_error async def _create_new_session( self,