Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Redis polling to dedicated thread #5947

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions openhands/server/session/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import threading
import time
from dataclasses import dataclass, field
from uuid import uuid4
Expand Down Expand Up @@ -44,7 +45,8 @@ class SessionManager:
_local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict)
local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict)
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
_redis_listen_task: asyncio.Task | None = None
_redis_listen_thread: threading.Thread | None = None
_redis_listen_stop_event: threading.Event = field(default_factory=threading.Event)
_session_is_running_checks: dict[str, _SessionIsRunningCheck] = field(
default_factory=dict
)
Expand All @@ -63,14 +65,18 @@ class SessionManager:
async def __aenter__(self):
redis_client = self._get_redis_client()
if redis_client:
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
self._redis_listen_stop_event.clear()
self._redis_listen_thread = threading.Thread(target=self._run_redis_subscribe)
self._redis_listen_thread.daemon = True
self._redis_listen_thread.start()
self._cleanup_task = asyncio.create_task(self._cleanup_detached_conversations())
return self

async def __aexit__(self, exc_type, exc_value, traceback):
if self._redis_listen_task:
self._redis_listen_task.cancel()
self._redis_listen_task = None
if self._redis_listen_thread:
self._redis_listen_stop_event.set()
self._redis_listen_thread.join()
self._redis_listen_thread = None
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
Expand All @@ -79,31 +85,32 @@ def _get_redis_client(self):
redis_client = getattr(self.sio.manager, 'redis', None)
return redis_client

async def _redis_subscribe(self):
def _run_redis_subscribe(self):
"""
We use a redis backchannel to send actions between server nodes
We use a redis backchannel to send actions between server nodes.
This method runs in a separate thread.
"""
logger.debug('_redis_subscribe')
redis_client = self._get_redis_client()
pubsub = redis_client.pubsub()
await pubsub.subscribe('oh_event')
while should_continue():
pubsub.subscribe('oh_event')

while not self._redis_listen_stop_event.is_set() and should_continue():
try:
message = await pubsub.get_message(
message = pubsub.get_message(
ignore_subscribe_messages=True, timeout=5
)
if message:
await self._process_message(message)
except asyncio.CancelledError:
return
except Exception:
try:
asyncio.get_running_loop()
logger.warning(
'error_reading_from_redis', exc_info=True, stack_info=True
# Schedule the message processing in the event loop
asyncio.run_coroutine_threadsafe(
self._process_message(message),
asyncio.get_event_loop()
)
except RuntimeError:
return # Loop has been shut down
except Exception:
logger.warning(
'error_reading_from_redis', exc_info=True, stack_info=True
)
time.sleep(1) # Avoid tight loop on error

async def _process_message(self, message: dict):
data = json.loads(message['data'])
Expand Down
Loading