Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle exceptions more explicitly #4971

Merged
merged 9 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
LLMNoActionError,
LLMResponseError,
)
from openhands.core.logger import LOG_ALL_EVENTS
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema import AgentState
from openhands.events import EventSource, EventStream, EventStreamSubscriber
Expand Down Expand Up @@ -506,8 +507,7 @@ async def _step(self) -> None:

await self.update_state_after_step()

# Use info level if LOG_ALL_EVENTS is set
log_level = 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug'
log_level = 'info' if LOG_ALL_EVENTS else 'debug'
self.log(log_level, str(action), extra={'msg_type': 'ACTION'})

async def _delegate_step(self):
Expand Down
9 changes: 7 additions & 2 deletions openhands/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
LOG_TO_FILE = os.getenv('LOG_TO_FILE', 'False').lower() in ['true', '1', 'yes']
DISABLE_COLOR_PRINTING = False

LOG_ALL_EVENTS = os.getenv('LOG_ALL_EVENTS', 'False').lower() in ['true', '1', 'yes']

ColorType = Literal[
'red',
'green',
Expand Down Expand Up @@ -65,8 +67,11 @@ def format(self, record):
return f'{time_str} - {name_str}:{level_str}: {record.filename}:{record.lineno}\n{msg_type_color}\n{msg}'
return f'{time_str} - {msg_type_color}\n{msg}'
elif msg_type == 'STEP':
msg = '\n\n==============\n' + record.msg + '\n'
return f'{msg}'
if LOG_ALL_EVENTS:
msg = '\n\n==============\n' + record.msg + '\n'
return f'{msg}'
else:
return record.msg
return super().format(record)


Expand Down
12 changes: 10 additions & 2 deletions openhands/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@
}


class RuntimeNotReadyError(Exception):
class RuntimeUnavailableError(Exception):
pass


class RuntimeDisconnectedError(Exception):
class RuntimeNotReadyError(RuntimeUnavailableError):
pass


class RuntimeDisconnectedError(RuntimeUnavailableError):
pass


class RuntimeNotFoundError(RuntimeUnavailableError):
pass


Expand Down
20 changes: 18 additions & 2 deletions openhands/runtime/impl/eventstream/eventstream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
)
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.base import (
Runtime,
RuntimeDisconnectedError,
RuntimeNotFoundError,
)
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
Expand Down Expand Up @@ -422,10 +426,22 @@ def _refresh_logs(self):

@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
reraise=(ConnectionRefusedError,),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
try:
container = self.docker_client.containers.get(self.container_name)
if container.status == 'exited':
raise RuntimeDisconnectedError(
f'Container {self.container_name} has exited.'
)
except docker.errors.NotFound:
raise RuntimeNotFoundError(f'Container {self.container_name} not found.')

self._refresh_logs()
if not self.log_buffer:
raise RuntimeError('Runtime client is not ready.')
Expand Down
5 changes: 4 additions & 1 deletion openhands/runtime/impl/remote/remote_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from openhands.runtime.base import (
Runtime,
RuntimeDisconnectedError,
RuntimeNotFoundError,
RuntimeNotReadyError,
)
from openhands.runtime.builder.remote import RemoteRuntimeBuilder
Expand Down Expand Up @@ -109,7 +110,9 @@ def _start_or_attach_to_runtime(self):
if existing_runtime:
self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
elif self.attach_to_existing:
raise RuntimeError('Could not find existing runtime to attach to.')
raise RuntimeNotFoundError(
f'Could not find existing runtime for SID: {self.sid}'
)
else:
self.send_status_message('STATUS$STARTING_CONTAINER')
if self.config.sandbox.runtime_container_image is None:
Expand Down
21 changes: 18 additions & 3 deletions openhands/server/listen.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Request,
UploadFile,
WebSocket,
WebSocketDisconnect,
status,
)
from fastapi.responses import FileResponse, JSONResponse
Expand Down Expand Up @@ -238,7 +239,8 @@ async def attach_session(request: Request, call_next):
request.state.conversation = await session_manager.attach_to_conversation(
request.state.sid
)
if request.state.conversation is None:
if not request.state.conversation:
logger.error(f'Runtime not found for session: {request.state.sid}')
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Session not found'},
Expand Down Expand Up @@ -344,7 +346,13 @@ async def websocket_endpoint(websocket: WebSocket):

latest_event_id = -1
if websocket.query_params.get('latest_event_id'):
latest_event_id = int(websocket.query_params.get('latest_event_id'))
try:
latest_event_id = int(websocket.query_params.get('latest_event_id'))
except ValueError:
logger.warning(
f'Invalid latest_event_id: {websocket.query_params.get("latest_event_id")}'
)
pass

async_stream = AsyncEventStreamWrapper(
session.agent_session.event_stream, latest_event_id + 1
Expand All @@ -361,7 +369,14 @@ async def websocket_endpoint(websocket: WebSocket):
),
):
continue
await websocket.send_json(event_to_dict(event))
try:
await websocket.send_json(event_to_dict(event))
except WebSocketDisconnect:
logger.warning(
'Websocket disconnected while sending event history, before loop started'
)
session.close()
return

await session.loop_recv()

Expand Down
6 changes: 3 additions & 3 deletions openhands/server/session/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.base import Runtime, RuntimeUnavailableError
from openhands.security import SecurityAnalyzer, options
from openhands.storage.files import FileStore

Expand Down Expand Up @@ -194,13 +194,13 @@ async def _create_runtime(

try:
await self.runtime.connect()
except Exception as e:
except RuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if self._status_callback:
self._status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
raise
return

if self.runtime is not None:
logger.debug(
Expand Down
7 changes: 6 additions & 1 deletion openhands/server/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.stream import session_exists
from openhands.runtime.base import RuntimeUnavailableError
from openhands.server.session.conversation import Conversation
from openhands.server.session.session import Session
from openhands.storage.files import FileStore
Expand All @@ -26,7 +27,11 @@ async def attach_to_conversation(self, sid: str) -> Conversation | None:
if not await session_exists(sid, self.file_store):
return None
c = Conversation(sid, file_store=self.file_store, config=self.config)
await c.connect()
try:
await c.connect()
except RuntimeUnavailableError as e:
logger.error(f'Error connecting to conversation {c.sid}: {e}')
return None
end_time = time.time()
logger.info(
f'Conversation {c.sid} connected in {end_time - start_time} seconds'
Expand Down
Loading