Skip to content

Commit

Permalink
retry robustness (#83)
Browse files Browse the repository at this point in the history
Why + What changed
===

- add details about what failed where possible instead of just
overriding the error with "Failed to create ws after retrying max number
of times"
- construct `handshake_metadata` earlier so if the await fails it is
before ws is established

the plan is to also stop proc level retries within ai-infra on handshake
errors because otherwise we get an explosion of connection attempts:
- single failure (either due to pid2 crashing or other reasons)
- causes retry at the proc level
- each proc level failure will retry the underlying transport connection
5 times

Test plan
=========

_Describe what you did to test this change to a level of detail that
allows your reviewer to test it_

---------

Co-authored-by: lhchavez <[email protected]>
  • Loading branch information
jackyzha0 and lhchavez authored Sep 11, 2024
1 parent bfdec38 commit acd8297
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions replit_river/client_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,16 @@ async def _establish_new_connection(
client_id = self._client_id
logger.info("Attempting to establish new ws connection")

last_error: Optional[Exception] = None
for i in range(max_retry):
if i > 0:
logger.info(f"Retrying build handshake number {i} times")
if not rate_limit.has_budget(client_id):
logger.debug("No retry budget for %s.", client_id)
break
raise RiverException(
ERROR_HANDSHAKE, f"No retry budget for {client_id}"
) from last_error

rate_limit.consume_budget(client_id)

# if the session is closed, we shouldn't use it
Expand All @@ -118,15 +122,14 @@ async def _establish_new_connection(

try:
websocket_uri = await self._websocket_uri_factory()
handshake_metadata = await self._handshake_metadata_factory()
ws = await websockets.connect(websocket_uri)
session_id = (
self.generate_session_id()
if not old_session
else old_session.session_id
)

handshake_metadata = await self._handshake_metadata_factory()

try:
(
handshake_request,
Expand All @@ -144,16 +147,18 @@ async def _establish_new_connection(
except RiverException as e:
await ws.close()
raise e
except Exception:
except Exception as e:
last_error = e
backoff_time = rate_limit.get_backoff_ms(client_id)
logger.exception(
f"Error connecting, retrying with {backoff_time}ms backoff"
)
await asyncio.sleep(backoff_time / 1000)

raise RiverException(
ERROR_HANDSHAKE,
"Failed to create ws after retrying max number of times",
)
f"Failed to create ws after retrying {max_retry} number of times",
) from last_error

async def _create_new_session(
self,
Expand Down

0 comments on commit acd8297

Please sign in to comment.