Skip to content

Commit

Permalink
Add system event listers for monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
raymyers committed Feb 25, 2025
1 parent f35ed5e commit a32a789
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 1 deletion.
7 changes: 6 additions & 1 deletion openhands/server/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from openhands.server.session.agent_session import AgentSession
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.settings import Settings
from openhands.server.system_event import SystemEventHandler, SystemEventType
from openhands.storage.files import FileStore

ROOM_KEY = 'room:{sid}'
Expand All @@ -41,6 +42,7 @@ class Session:
config: AppConfig
file_store: FileStore
user_id: str | None
system_event_handler: SystemEventHandler

def __init__(
self,
Expand Down Expand Up @@ -87,7 +89,7 @@ async def initialize_agent(
AgentStateChangedObservation('', AgentState.LOADING),
EventSource.ENVIRONMENT,
)

self.system_event_handler.on_event(SystemEventType.CONVERSATION_START, self.sid)
agent_cls = settings.agent or self.config.default_agent
self.config.security.confirmation_mode = (
self.config.security.confirmation_mode
Expand Down Expand Up @@ -242,6 +244,9 @@ async def send_error(self, message: str):
async def _send_status_message(self, msg_type: str, id: str, message: str):
"""Sends a status message to the client."""
if msg_type == 'error':
self.system_event_handler.on_event(
SystemEventType.AGENT_STATUS_ERROR, self.sid
)
await self.agent_session.stop_agent_loop_for_error()
await self.send(
{'status_update': True, 'type': msg_type, 'id': id, 'message': message}
Expand Down
3 changes: 3 additions & 0 deletions openhands/server/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from openhands.server.conversation_manager.conversation_manager import (
ConversationManager,
)
from openhands.server.system_event import SystemEventHandler
from openhands.storage import get_file_store
from openhands.storage.conversation.conversation_store import ConversationStore
from openhands.storage.settings.settings_store import SettingsStore
Expand Down Expand Up @@ -44,3 +45,5 @@
ConversationStore, # type: ignore
server_config.conversation_store_class,
)

system_event_handler = SystemEventHandler()
34 changes: 34 additions & 0 deletions openhands/server/system_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
from enum import StrEnum
from typing import Any


class SystemEventType(StrEnum):
CONVERSATION_START = 'conversation_start'
AGENT_STATUS_ERROR = 'agent_status_error'


class SystemEventListener(ABC):
@abstractmethod
def on_event(self, type: SystemEventType, data: dict[str, Any]):
pass


class SystemEventHandler:
_listeners: list[SystemEventListener]

def __init__(self):
self._listeners = []

def add_listener(self, listener: SystemEventListener):
"""Forward future on_event calls to listener."""
self._listeners.append(listener)

def on_event(self, type: SystemEventType, session_id: str, **kwargs):
"""Forwards on_event calls to all listeners, swallowing exceptions."""
for listener in self._listeners:
try:
event_data = {'session_id': session_id, **kwargs}
listener.on_event(type, event_data)
except Exception as _:
pass
48 changes: 48 additions & 0 deletions tests/unit/test_system_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from unittest.mock import Mock

from openhands.server.system_event import (
SystemEventHandler,
SystemEventListener,
SystemEventType,
)


def test_event_forwarding():
"""Test that events are forwarded to all listeners."""
handler = SystemEventHandler()
mock_listener1 = Mock(spec=SystemEventListener)
mock_listener2 = Mock(spec=SystemEventListener)

handler.add_listener(mock_listener1)
handler.add_listener(mock_listener2)

session_id = 'test_session'
event_type = SystemEventType.CONVERSATION_START

handler.on_event(event_type, session_id)

expected_data = {'session_id': session_id}
mock_listener1.on_event.assert_called_once_with(event_type, expected_data)
mock_listener2.on_event.assert_called_once_with(event_type, expected_data)


def test_exception_handling():
"""Test that exceptions from listeners are caught and don't affect other listeners."""
handler = SystemEventHandler()
mock_listener1 = Mock(spec=SystemEventListener)
mock_listener1.on_event.side_effect = Exception('Test error')

mock_listener2 = Mock(spec=SystemEventListener)

handler.add_listener(mock_listener1)
handler.add_listener(mock_listener2)

session_id = 'test_session'
event_type = SystemEventType.AGENT_STATUS_ERROR

# Should not raise an exception
handler.on_event(event_type, session_id)

# Second listener should still be called
expected_data = {'session_id': session_id}
mock_listener2.on_event.assert_called_once_with(event_type, expected_data)

0 comments on commit a32a789

Please sign in to comment.