From cecac3fa202b5cdca6c4815264b0c127db477479 Mon Sep 17 00:00:00 2001 From: Tim O'Farrell Date: Sat, 23 Nov 2024 08:38:35 -0700 Subject: [PATCH] Announcing session closing (When shutdown is graceful) --- openhands/server/session/manager.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/openhands/server/session/manager.py b/openhands/server/session/manager.py index 4537d794fe04..70bada94e557 100644 --- a/openhands/server/session/manager.py +++ b/openhands/server/session/manager.py @@ -65,6 +65,7 @@ async def _redis_subscribe(self): if session: await session.dispatch(data["data"]) elif message_type == "is_session_running": + # Another node in the cluster is asking if the current node is running the session given. session = self.local_sessions_by_sid.get(sid) if session: await redis_client.publish("oh_event", json.dumps({ @@ -75,6 +76,13 @@ async def _redis_subscribe(self): flag = self._session_is_running_flags.get(sid) if flag: flag.set() + elif message_type == "session_closing": + logger.info(f"session_closing:{sid}") + for connection_id, local_sid in self.local_connection_id_to_session_id.items(): + if sid == local_sid: + logger.warning('local_connection_to_closing_session') + + except asyncio.CancelledError: return except: @@ -221,6 +229,14 @@ async def _cleanup_session(self, session: Session, connection_id: str, force: bo # If no connections, close session if force or (not has_local_connections and not redis_connections): + + # We alert the cluster in case they are interested + if redis_client: + await redis_client.publish("oh_event", json.dumps({ + "sid": session.sid, + "message_type": "session_closing" + })) + logger.info(f'do_close_session') session.close() self.local_sessions_by_sid.pop(session.sid, None)