From 8c1fa936f97e5489df20cecc0ab0d1772ef4af07 Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 13 Nov 2024 22:28:21 +0000 Subject: [PATCH] Fix resource leaks in runtime and server components - Add proper cleanup of request sessions in runtime implementations - Add proper cleanup of WebSocket connections in server - Add proper cleanup sequence for runtime resources - Add proper cleanup of Google Cloud storage clients - Add error handling and logging for cleanup failures - Add state tracking to prevent double-close issues --- .../runtime/impl/remote/remote_runtime.py | 45 ++++++++++--------- openhands/server/session/agent_session.py | 37 +++++++++++---- openhands/server/session/session.py | 15 ++++++- openhands/storage/google_cloud.py | 15 +++++++ 4 files changed, 83 insertions(+), 29 deletions(-) diff --git a/openhands/runtime/impl/remote/remote_runtime.py b/openhands/runtime/impl/remote/remote_runtime.py index 8c9843de98b4..f61b11bcddb6 100644 --- a/openhands/runtime/impl/remote/remote_runtime.py +++ b/openhands/runtime/impl/remote/remote_runtime.py @@ -375,29 +375,34 @@ def _wait_until_alive_impl(self): raise RuntimeNotReadyError() def close(self, timeout: int = 10): - if self.config.sandbox.keep_runtime_alive or self.attach_to_existing: - self.session.close() + if not hasattr(self, 'session') or self.session is None: return - if self.runtime_id and self.session: + + try: + if not (self.config.sandbox.keep_runtime_alive or self.attach_to_existing): + if self.runtime_id: + try: + response = self._send_request( + 'POST', + f'{self.config.sandbox.remote_runtime_api_url}/stop', + is_retry=False, + json={'runtime_id': self.runtime_id}, + timeout=timeout, + ) + if response.status_code != 200: + self.log( + 'error', + f'Failed to stop runtime: {response.text}', + ) + else: + self.log('debug', 'Runtime stopped.') + except Exception as e: + self.log('error', f'Error stopping runtime: {e}') + finally: try: - response = self._send_request( - 'POST', - f'{self.config.sandbox.remote_runtime_api_url}/stop', - is_retry=False, - json={'runtime_id': self.runtime_id}, - timeout=timeout, - ) - if response.status_code != 200: - self.log( - 'error', - f'Failed to stop runtime: {response.text}', - ) - else: - self.log('debug', 'Runtime stopped.') - except Exception as e: - raise e - finally: self.session.close() + except Exception as e: + self.log('error', f'Error closing session: {e}') def run_action(self, action: Action, is_retry: bool = False) -> Observation: if action.timeout is None: diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index 8f9d20a5dc6e..b32283e0dfe5 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -138,14 +138,35 @@ def inner_close(): asyncio.get_event_loop().run_in_executor(None, inner_close) async def _close(self): - if self.controller is not None: - end_state = self.controller.get_state() - end_state.save_to_session(self.sid, self.file_store) - await self.controller.close() - if self.runtime is not None: - self.runtime.close() - if self.security_analyzer is not None: - await self.security_analyzer.close() + try: + if self.controller is not None: + try: + end_state = self.controller.get_state() + end_state.save_to_session(self.sid, self.file_store) + await self.controller.close() + except Exception as e: + logger.error(f'Error closing controller: {e}') + finally: + self.controller = None + + if self.runtime is not None: + try: + self.runtime.close() + except Exception as e: + logger.error(f'Error closing runtime: {e}') + finally: + self.runtime = None + + if self.security_analyzer is not None: + try: + await self.security_analyzer.close() + except Exception as e: + logger.error(f'Error closing security analyzer: {e}') + finally: + self.security_analyzer = None + except Exception as e: + logger.error(f'Error in _close: {e}') + raise async def stop_agent_loop_for_error(self): if self.controller is not None: diff --git a/openhands/server/session/session.py b/openhands/server/session/session.py index e70d80e84f3e..9c5dbc814908 100644 --- a/openhands/server/session/session.py +++ b/openhands/server/session/session.py @@ -51,7 +51,20 @@ def __init__( def close(self): self.is_alive = False - self.agent_session.close() + try: + if self.websocket is not None: + # Schedule websocket close in the event loop to avoid RuntimeError + asyncio.run_coroutine_threadsafe( + self.websocket.close(), self.loop + ) + self.websocket = None + except Exception as e: + logger.error(f'Error closing websocket: {e}') + finally: + try: + self.agent_session.close() + except Exception as e: + logger.error(f'Error closing agent session: {e}') async def loop_recv(self): try: diff --git a/openhands/storage/google_cloud.py b/openhands/storage/google_cloud.py index bbd2da273098..ba76c32fe3ec 100644 --- a/openhands/storage/google_cloud.py +++ b/openhands/storage/google_cloud.py @@ -4,6 +4,7 @@ from google.api_core.exceptions import NotFound from google.cloud import storage +from openhands.core.logger import openhands_logger as logger from openhands.storage.files import FileStore @@ -18,6 +19,20 @@ def __init__(self, bucket_name: Optional[str] = None) -> None: bucket_name = os.environ['GOOGLE_CLOUD_BUCKET_NAME'] self.storage_client = storage.Client() self.bucket = self.storage_client.bucket(bucket_name) + self._closed = False + + def __del__(self): + self.close() + + def close(self): + """Close the storage client and cleanup resources.""" + if not self._closed and hasattr(self, 'storage_client'): + try: + self.storage_client.close() + except Exception as e: + logger.error(f'Error closing storage client: {e}') + finally: + self._closed = True def write(self, path: str, contents: str | bytes) -> None: blob = self.bucket.blob(path)