Skip to content

Commit

Permalink
Announcing session closing (When shutdown is graceful)
Browse files Browse the repository at this point in the history
  • Loading branch information
tofarr committed Nov 23, 2024
1 parent c74961a commit cecac3f
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions openhands/server/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def _redis_subscribe(self):
if session:
await session.dispatch(data["data"])
elif message_type == "is_session_running":
# Another node in the cluster is asking if the current node is running the session given.
session = self.local_sessions_by_sid.get(sid)
if session:
await redis_client.publish("oh_event", json.dumps({
Expand All @@ -75,6 +76,13 @@ async def _redis_subscribe(self):
flag = self._session_is_running_flags.get(sid)
if flag:
flag.set()
elif message_type == "session_closing":
logger.info(f"session_closing:{sid}")
for connection_id, local_sid in self.local_connection_id_to_session_id.items():
if sid == local_sid:
logger.warning('local_connection_to_closing_session')


except asyncio.CancelledError:
return
except:
Expand Down Expand Up @@ -221,6 +229,14 @@ async def _cleanup_session(self, session: Session, connection_id: str, force: bo

# If no connections, close session
if force or (not has_local_connections and not redis_connections):

# We alert the cluster in case they are interested
if redis_client:
await redis_client.publish("oh_event", json.dumps({
"sid": session.sid,
"message_type": "session_closing"
}))

logger.info(f'do_close_session')
session.close()
self.local_sessions_by_sid.pop(session.sid, None)

0 comments on commit cecac3f

Please sign in to comment.