diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 78b27c89ff7f..04f5f5722334 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -15,6 +15,7 @@ LLMNoActionError, LLMResponseError, ) +from openhands.core.logger import LOG_ALL_EVENTS from openhands.core.logger import openhands_logger as logger from openhands.core.schema import AgentState from openhands.events import EventSource, EventStream, EventStreamSubscriber @@ -506,8 +507,7 @@ async def _step(self) -> None: await self.update_state_after_step() - # Use info level if LOG_ALL_EVENTS is set - log_level = 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug' + log_level = 'info' if LOG_ALL_EVENTS else 'debug' self.log(log_level, str(action), extra={'msg_type': 'ACTION'}) async def _delegate_step(self): diff --git a/openhands/core/logger.py b/openhands/core/logger.py index 20a4a4d6581a..c3826130201a 100644 --- a/openhands/core/logger.py +++ b/openhands/core/logger.py @@ -16,6 +16,8 @@ LOG_TO_FILE = os.getenv('LOG_TO_FILE', 'False').lower() in ['true', '1', 'yes'] DISABLE_COLOR_PRINTING = False +LOG_ALL_EVENTS = os.getenv('LOG_ALL_EVENTS', 'False').lower() in ['true', '1', 'yes'] + ColorType = Literal[ 'red', 'green', @@ -65,8 +67,11 @@ def format(self, record): return f'{time_str} - {name_str}:{level_str}: {record.filename}:{record.lineno}\n{msg_type_color}\n{msg}' return f'{time_str} - {msg_type_color}\n{msg}' elif msg_type == 'STEP': - msg = '\n\n==============\n' + record.msg + '\n' - return f'{msg}' + if LOG_ALL_EVENTS: + msg = '\n\n==============\n' + record.msg + '\n' + return f'{msg}' + else: + return record.msg return super().format(record) diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index b12c501c19f3..74891a7d52b0 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -47,11 +47,19 @@ } -class RuntimeNotReadyError(Exception): +class RuntimeUnavailableError(Exception): pass -class RuntimeDisconnectedError(Exception): +class RuntimeNotReadyError(RuntimeUnavailableError): + pass + + +class RuntimeDisconnectedError(RuntimeUnavailableError): + pass + + +class RuntimeNotFoundError(RuntimeUnavailableError): pass diff --git a/openhands/runtime/impl/eventstream/eventstream_runtime.py b/openhands/runtime/impl/eventstream/eventstream_runtime.py index 77cbaf338281..a59683282980 100644 --- a/openhands/runtime/impl/eventstream/eventstream_runtime.py +++ b/openhands/runtime/impl/eventstream/eventstream_runtime.py @@ -34,7 +34,11 @@ ) from openhands.events.serialization import event_to_dict, observation_from_dict from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS -from openhands.runtime.base import Runtime +from openhands.runtime.base import ( + Runtime, + RuntimeDisconnectedError, + RuntimeNotFoundError, +) from openhands.runtime.builder import DockerRuntimeBuilder from openhands.runtime.impl.eventstream.containers import remove_all_containers from openhands.runtime.plugins import PluginRequirement @@ -422,10 +426,22 @@ def _refresh_logs(self): @tenacity.retry( stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), - reraise=(ConnectionRefusedError,), + retry=tenacity.retry_if_exception_type( + (ConnectionError, requests.exceptions.ConnectionError) + ), + reraise=True, wait=tenacity.wait_fixed(2), ) def _wait_until_alive(self): + try: + container = self.docker_client.containers.get(self.container_name) + if container.status == 'exited': + raise RuntimeDisconnectedError( + f'Container {self.container_name} has exited.' + ) + except docker.errors.NotFound: + raise RuntimeNotFoundError(f'Container {self.container_name} not found.') + self._refresh_logs() if not self.log_buffer: raise RuntimeError('Runtime client is not ready.') diff --git a/openhands/runtime/impl/remote/remote_runtime.py b/openhands/runtime/impl/remote/remote_runtime.py index 8c9843de98b4..f0ef0cbec19a 100644 --- a/openhands/runtime/impl/remote/remote_runtime.py +++ b/openhands/runtime/impl/remote/remote_runtime.py @@ -31,6 +31,7 @@ from openhands.runtime.base import ( Runtime, RuntimeDisconnectedError, + RuntimeNotFoundError, RuntimeNotReadyError, ) from openhands.runtime.builder.remote import RemoteRuntimeBuilder @@ -109,7 +110,9 @@ def _start_or_attach_to_runtime(self): if existing_runtime: self.log('debug', f'Using existing runtime with ID: {self.runtime_id}') elif self.attach_to_existing: - raise RuntimeError('Could not find existing runtime to attach to.') + raise RuntimeNotFoundError( + f'Could not find existing runtime for SID: {self.sid}' + ) else: self.send_status_message('STATUS$STARTING_CONTAINER') if self.config.sandbox.runtime_container_image is None: diff --git a/openhands/server/listen.py b/openhands/server/listen.py index d18bea277404..4ab814d43e16 100644 --- a/openhands/server/listen.py +++ b/openhands/server/listen.py @@ -34,6 +34,7 @@ Request, UploadFile, WebSocket, + WebSocketDisconnect, status, ) from fastapi.responses import FileResponse, JSONResponse @@ -238,7 +239,8 @@ async def attach_session(request: Request, call_next): request.state.conversation = await session_manager.attach_to_conversation( request.state.sid ) - if request.state.conversation is None: + if not request.state.conversation: + logger.error(f'Runtime not found for session: {request.state.sid}') return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'error': 'Session not found'}, @@ -344,7 +346,13 @@ async def websocket_endpoint(websocket: WebSocket): latest_event_id = -1 if websocket.query_params.get('latest_event_id'): - latest_event_id = int(websocket.query_params.get('latest_event_id')) + try: + latest_event_id = int(websocket.query_params.get('latest_event_id')) + except ValueError: + logger.warning( + f'Invalid latest_event_id: {websocket.query_params.get("latest_event_id")}' + ) + pass async_stream = AsyncEventStreamWrapper( session.agent_session.event_stream, latest_event_id + 1 @@ -361,7 +369,14 @@ async def websocket_endpoint(websocket: WebSocket): ), ): continue - await websocket.send_json(event_to_dict(event)) + try: + await websocket.send_json(event_to_dict(event)) + except WebSocketDisconnect: + logger.warning( + 'Websocket disconnected while sending event history, before loop started' + ) + session.close() + return await session.loop_recv() diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index 8f9d20a5dc6e..76f6e2aa8bcb 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -11,7 +11,7 @@ from openhands.events.event import EventSource from openhands.events.stream import EventStream from openhands.runtime import get_runtime_cls -from openhands.runtime.base import Runtime +from openhands.runtime.base import Runtime, RuntimeUnavailableError from openhands.security import SecurityAnalyzer, options from openhands.storage.files import FileStore @@ -194,13 +194,13 @@ async def _create_runtime( try: await self.runtime.connect() - except Exception as e: + except RuntimeUnavailableError as e: logger.error(f'Runtime initialization failed: {e}', exc_info=True) if self._status_callback: self._status_callback( 'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e) ) - raise + return if self.runtime is not None: logger.debug( diff --git a/openhands/server/session/manager.py b/openhands/server/session/manager.py index f746b3676e29..790b7c4bd1eb 100644 --- a/openhands/server/session/manager.py +++ b/openhands/server/session/manager.py @@ -6,6 +6,7 @@ from openhands.core.config import AppConfig from openhands.core.logger import openhands_logger as logger from openhands.events.stream import session_exists +from openhands.runtime.base import RuntimeUnavailableError from openhands.server.session.conversation import Conversation from openhands.server.session.session import Session from openhands.storage.files import FileStore @@ -26,7 +27,11 @@ async def attach_to_conversation(self, sid: str) -> Conversation | None: if not await session_exists(sid, self.file_store): return None c = Conversation(sid, file_store=self.file_store, config=self.config) - await c.connect() + try: + await c.connect() + except RuntimeUnavailableError as e: + logger.error(f'Error connecting to conversation {c.sid}: {e}') + return None end_time = time.time() logger.info( f'Conversation {c.sid} connected in {end_time - start_time} seconds'