Skip to content

Commit

Permalink
Fix resource leaks in runtime and server components
Browse files Browse the repository at this point in the history
- Add proper cleanup of request sessions in runtime implementations
- Add proper cleanup of WebSocket connections in server
- Add proper cleanup sequence for runtime resources
- Add proper cleanup of Google Cloud storage clients
- Add error handling and logging for cleanup failures
- Add state tracking to prevent double-close issues
  • Loading branch information
openhands-agent committed Nov 13, 2024
1 parent a93f140 commit 8c1fa93
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 29 deletions.
45 changes: 25 additions & 20 deletions openhands/runtime/impl/remote/remote_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,29 +375,34 @@ def _wait_until_alive_impl(self):
raise RuntimeNotReadyError()

def close(self, timeout: int = 10):
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
self.session.close()
if not hasattr(self, 'session') or self.session is None:
return
if self.runtime_id and self.session:

try:
if not (self.config.sandbox.keep_runtime_alive or self.attach_to_existing):
if self.runtime_id:
try:
response = self._send_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/stop',
is_retry=False,
json={'runtime_id': self.runtime_id},
timeout=timeout,
)
if response.status_code != 200:
self.log(
'error',
f'Failed to stop runtime: {response.text}',
)
else:
self.log('debug', 'Runtime stopped.')
except Exception as e:
self.log('error', f'Error stopping runtime: {e}')
finally:
try:
response = self._send_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/stop',
is_retry=False,
json={'runtime_id': self.runtime_id},
timeout=timeout,
)
if response.status_code != 200:
self.log(
'error',
f'Failed to stop runtime: {response.text}',
)
else:
self.log('debug', 'Runtime stopped.')
except Exception as e:
raise e
finally:
self.session.close()
except Exception as e:
self.log('error', f'Error closing session: {e}')

def run_action(self, action: Action, is_retry: bool = False) -> Observation:
if action.timeout is None:
Expand Down
37 changes: 29 additions & 8 deletions openhands/server/session/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,35 @@ def inner_close():
asyncio.get_event_loop().run_in_executor(None, inner_close)

async def _close(self):
if self.controller is not None:
end_state = self.controller.get_state()
end_state.save_to_session(self.sid, self.file_store)
await self.controller.close()
if self.runtime is not None:
self.runtime.close()
if self.security_analyzer is not None:
await self.security_analyzer.close()
try:
if self.controller is not None:
try:
end_state = self.controller.get_state()
end_state.save_to_session(self.sid, self.file_store)
await self.controller.close()
except Exception as e:
logger.error(f'Error closing controller: {e}')
finally:
self.controller = None

if self.runtime is not None:
try:
self.runtime.close()
except Exception as e:
logger.error(f'Error closing runtime: {e}')
finally:
self.runtime = None

if self.security_analyzer is not None:
try:
await self.security_analyzer.close()
except Exception as e:
logger.error(f'Error closing security analyzer: {e}')
finally:
self.security_analyzer = None
except Exception as e:
logger.error(f'Error in _close: {e}')
raise

async def stop_agent_loop_for_error(self):
if self.controller is not None:
Expand Down
15 changes: 14 additions & 1 deletion openhands/server/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,20 @@ def __init__(

def close(self):
self.is_alive = False
self.agent_session.close()
try:
if self.websocket is not None:
# Schedule websocket close in the event loop to avoid RuntimeError
asyncio.run_coroutine_threadsafe(
self.websocket.close(), self.loop
)
self.websocket = None
except Exception as e:
logger.error(f'Error closing websocket: {e}')
finally:
try:
self.agent_session.close()
except Exception as e:
logger.error(f'Error closing agent session: {e}')

async def loop_recv(self):
try:
Expand Down
15 changes: 15 additions & 0 deletions openhands/storage/google_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from google.api_core.exceptions import NotFound
from google.cloud import storage

from openhands.core.logger import openhands_logger as logger
from openhands.storage.files import FileStore


Expand All @@ -18,6 +19,20 @@ def __init__(self, bucket_name: Optional[str] = None) -> None:
bucket_name = os.environ['GOOGLE_CLOUD_BUCKET_NAME']
self.storage_client = storage.Client()
self.bucket = self.storage_client.bucket(bucket_name)
self._closed = False

def __del__(self):
self.close()

def close(self):
"""Close the storage client and cleanup resources."""
if not self._closed and hasattr(self, 'storage_client'):
try:
self.storage_client.close()
except Exception as e:
logger.error(f'Error closing storage client: {e}')
finally:
self._closed = True

def write(self, path: str, contents: str | bytes) -> None:
blob = self.bucket.blob(path)
Expand Down

0 comments on commit 8c1fa93

Please sign in to comment.