From 1a82bb2ce9b8e1c74ac2e5a60b24734763b52e00 Mon Sep 17 00:00:00 2001 From: Qiang Li Date: Wed, 20 Nov 2024 04:19:17 +0000 Subject: [PATCH] Rebase and remove docker runtime Add volume for .venv for dev build --- Makefile | 4 +- compose.yml | 8 +- containers/custom-sandbox/README.md | 39 +- containers/custom-sandbox/build.example.sh | 1 + containers/custom-sandbox/compose.example.yml | 2 +- containers/custom-sandbox/config.example.toml | 11 +- containers/dev/Dockerfile | 9 +- containers/dev/compose.yml | 4 +- openhands/core/config/sandbox_config.py | 6 +- openhands/runtime/__init__.py | 5 - .../runtime/impl/docker/docker_runtime.py | 493 ------------------ .../impl/eventstream/eventstream_runtime.py | 25 +- 12 files changed, 79 insertions(+), 528 deletions(-) delete mode 100644 openhands/runtime/impl/docker/docker_runtime.py diff --git a/Makefile b/Makefile index c4e1949b9621..4395879fbe3d 100644 --- a/Makefile +++ b/Makefile @@ -3,9 +3,9 @@ SHELL=/bin/bash # Variables BACKEND_HOST ?= "127.0.0.1" -BACKEND_PORT = 3000 +BACKEND_PORT ?= 3000 BACKEND_HOST_PORT = "$(BACKEND_HOST):$(BACKEND_PORT)" -FRONTEND_PORT = 3001 +FRONTEND_PORT ?= 3001 DEFAULT_WORKSPACE_DIR = "./workspace" DEFAULT_MODEL = "gpt-4o" CONFIG_FILE = config.toml diff --git a/compose.yml b/compose.yml index b54e270ab28e..3f6f6f7c4845 100644 --- a/compose.yml +++ b/compose.yml @@ -10,8 +10,14 @@ services: - SANDBOX_RUNTIME_CONTAINER_IMAGE=${SANDBOX_RUNTIME_CONTAINER_IMAGE:-ghcr.io/all-hands-ai/runtime:0.14-nikolaik} - SANDBOX_USER_ID=${SANDBOX_USER_ID:-1234} - WORKSPACE_MOUNT_PATH=${WORKSPACE_BASE:-$PWD/workspace} + # + - LOG_ALL_EVENTS=true + # See containers/custom-sandbox/README.md + - SANDBOX_REMOTE_RUNTIME_API_URL=http://host.docker.internal:8000/ + - SANDBOX_CONTAINER_NAME=${SANDBOX_CONTAINER_NAME:-} + - SANDBOX_DOCKER_ENDPOINT=${SANDBOX_DOCKER_ENDPOINT:-} ports: - - "3000:3000" + - "${FRONTEND_PORT:-3000}:3000" extra_hosts: - "host.docker.internal:host-gateway" volumes: diff --git a/containers/custom-sandbox/README.md b/containers/custom-sandbox/README.md index 4bff71abf89a..f98580b84161 100644 --- a/containers/custom-sandbox/README.md +++ b/containers/custom-sandbox/README.md @@ -1,17 +1,26 @@ # How to build custom Docker sandbox for OpenHands -[Docker](https://docs.docker.com/get-started/docker-overview/) is an open platform for developing, shipping, and running applications. +If you are looking for instructions for building custom sandboxes managed by OpenHands. Please follow [here](https://docs.all-hands.dev/modules/usage/how-to/custom-sandbox-guide) -This folder contains working examples to get you started. +This folder contains working examples for building a custom sandbox that you can run in the same Docker context or a different one. See Docker [context](https://docs.docker.com/engine/manage-resources/contexts/) for more info. ## Build the sandbox +Before attempting to build the sandbox, make sure you can [build OpenHands](https://github.com/All-Hands-AI/OpenHands/blob/main/Development.md). + +You may try and build inside docker container. + +```bash +make docker-dev +``` + ```bash # rename the files and make changes as you please cp build.example.sh build.sh cp compose.example.yml compose.yml -./build.sh +# ./build.sh +./build.sh ``` ## Start up sandbox @@ -20,14 +29,30 @@ cp compose.example.yml compose.yml docker compose up -d ``` -## Update config.toml and restart OpenHands +## Start OpenHands + +Update config.toml ```toml [core] -runtime="docker" [sandbox] -container_name="" -remote_runtime_api_url="http://:/" +# http://:/ +remote_runtime_api_url = "http://host.docker.internal:8000/" +# +container_name = "custom-sandbox" +# docker context endpoint +docker_endpoint = "unix:///var/run/docker.sock" +``` + +or use env. + +```bash +export SANDBOX_REMOTE_RUNTIME_API_URL=http://host.docker.internal:8000/ +export SANDBOX_CONTAINER_NAME=custom-sandbox +export SANDBOX_DOCKER_ENDPOINT=unix:///var/run/docker.sock +``` +```bash +make docker-run ``` diff --git a/containers/custom-sandbox/build.example.sh b/containers/custom-sandbox/build.example.sh index 7fffc5281d90..7336c3d24424 100755 --- a/containers/custom-sandbox/build.example.sh +++ b/containers/custom-sandbox/build.example.sh @@ -1,4 +1,5 @@ #!/bin/bash +# args: set -euo pipefail diff --git a/containers/custom-sandbox/compose.example.yml b/containers/custom-sandbox/compose.example.yml index 42d5cb0ea878..e2ca6e3a39ed 100644 --- a/containers/custom-sandbox/compose.example.yml +++ b/containers/custom-sandbox/compose.example.yml @@ -14,7 +14,7 @@ services: poetry run python -u -m openhands.runtime.action_execution_server 3000 --working-dir /workspace - --plugins agent_skills jupyter + --plugins agent_skills jupyter vscode --username openhands # the publicly exposed port 8000 must match the port in remote_runtime_api_url defined in the config.toml ports: diff --git a/containers/custom-sandbox/config.example.toml b/containers/custom-sandbox/config.example.toml index b51bdc642414..a05d5f697291 100644 --- a/containers/custom-sandbox/config.example.toml +++ b/containers/custom-sandbox/config.example.toml @@ -1,10 +1,11 @@ [core] -runtime="docker" [sandbox] -container_name="custom-sandbox" -remote_runtime_api_url="http://host.docker.internal:8000/" - -docker_url="unix:///var/run/docker.sock" +# http://:/ +remote_runtime_api_url = "http://host.docker.internal:8000/" +# +container_name = "custom-sandbox" +# docker context endpoint +docker_endpoint = "unix:///var/run/docker.sock" ### diff --git a/containers/dev/Dockerfile b/containers/dev/Dockerfile index eee65f30e5c4..76ecbfa282bd 100644 --- a/containers/dev/Dockerfile +++ b/containers/dev/Dockerfile @@ -106,18 +106,13 @@ RUN apt-get update && apt-get install -y \ WORKDIR /app -# cache build dependencies +# RUN \ - --mount=type=bind,source=./,target=/app/ \ + --mount=type=bind,rw,source=./,target=/app/ \ < dict: """Serialize fields to a dict for the frontend, including type hints, defaults, and whether it's optional.""" diff --git a/openhands/runtime/__init__.py b/openhands/runtime/__init__.py index eff9ff0894d4..16534daf6b56 100644 --- a/openhands/runtime/__init__.py +++ b/openhands/runtime/__init__.py @@ -21,11 +21,6 @@ def get_runtime_cls(name: str): return ModalRuntime elif name == 'runloop': return RunloopRuntime - elif name == 'docker': - logger.info('Using custom DockerRuntime') - from openhands.runtime.impl.docker.docker_runtime import DockerRuntime - - return DockerRuntime else: raise ValueError(f'Runtime {name} not supported') diff --git a/openhands/runtime/impl/docker/docker_runtime.py b/openhands/runtime/impl/docker/docker_runtime.py deleted file mode 100644 index 83a27b4f6c19..000000000000 --- a/openhands/runtime/impl/docker/docker_runtime.py +++ /dev/null @@ -1,493 +0,0 @@ -import os -import tempfile -import threading -from functools import lru_cache -from typing import Callable -from zipfile import ZipFile - -import docker -import requests -import tenacity - -from openhands.core.config import AppConfig -from openhands.core.logger import openhands_logger as logger -from openhands.events import EventStream -from openhands.events.action import ( - ActionConfirmationStatus, - BrowseInteractiveAction, - BrowseURLAction, - CmdRunAction, - FileEditAction, - FileReadAction, - FileWriteAction, - IPythonRunCellAction, -) -from openhands.events.action.action import Action -from openhands.events.observation import ( - FatalErrorObservation, - NullObservation, - Observation, - UserRejectObservation, -) -from openhands.events.serialization import event_to_dict, observation_from_dict -from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS -from openhands.runtime.base import Runtime -from openhands.runtime.plugins import PluginRequirement -from openhands.runtime.utils import find_available_tcp_port -from openhands.runtime.utils.request import send_request_with_retry -from openhands.utils.tenacity_stop import stop_if_should_exit - - -class LogBuffer: - """Synchronous buffer for Docker container logs. - - This class provides a thread-safe way to collect, store, and retrieve logs - from a Docker container. It uses a list to store log lines and provides methods - for appending, retrieving, and clearing logs. - """ - - def __init__(self, container: docker.models.containers.Container): - self.client_ready = False - self.init_msg = 'Runtime client initialized.' - - self.buffer: list[str] = [] - self.lock = threading.Lock() - self._stop_event = threading.Event() - self.log_generator = container.logs(stream=True, follow=True) - self.log_stream_thread = threading.Thread(target=self.stream_logs) - self.log_stream_thread.daemon = True - self.log_stream_thread.start() - - def append(self, log_line: str): - with self.lock: - self.buffer.append(log_line) - - def get_and_clear(self) -> list[str]: - with self.lock: - logs = list(self.buffer) - self.buffer.clear() - return logs - - def stream_logs(self): - """Stream logs from the Docker container in a separate thread. - - This method runs in its own thread to handle the blocking - operation of reading log lines from the Docker SDK's synchronous generator. - """ - try: - for log_line in self.log_generator: - if self._stop_event.is_set(): - break - if log_line: - decoded_line = log_line.decode('utf-8').rstrip() - self.append(decoded_line) - if self.init_msg in decoded_line: - self.client_ready = True - except Exception as e: - logger.error(f'Error streaming docker logs: {e}') - - def __del__(self): - if self.log_stream_thread.is_alive(): - logger.warn( - "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown." - ) - self.close(timeout=5) - - def close(self, timeout: float = 5.0): - self._stop_event.set() - self.log_stream_thread.join(timeout) - - -class DockerRuntime(Runtime): - """This runtime will subscribe the event stream. - - When receive an event, it will send the event to runtime-client which run inside the docker environment. - - Args: - config (AppConfig): The application configuration. - event_stream (EventStream): The event stream to subscribe to. - sid (str, optional): The session ID. Defaults to 'default'. - plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None. - env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None. - """ - - # - def init_base_runtime( - self, - config: AppConfig, - event_stream: EventStream, - sid: str = 'default', - plugins: list[PluginRequirement] | None = None, - env_vars: dict[str, str] | None = None, - status_message_callback: Callable | None = None, - attach_to_existing: bool = False, - ): - super().__init__( - config, - event_stream, - sid, - plugins, - env_vars, - status_message_callback, - attach_to_existing, - ) - - def __init__( - self, - config: AppConfig, - event_stream: EventStream, - sid: str = 'default', - plugins: list[PluginRequirement] | None = None, - env_vars: dict[str, str] | None = None, - status_message_callback: Callable | None = None, - attach_to_existing: bool = False, - ): - self.config = config - - # TODO: Just define one 'runtime_api_url'? - self.api_url = ( - config.sandbox.remote_runtime_api_url or config.sandbox.local_runtime_url - ) - self.session = requests.Session() - self.status_message_callback = status_message_callback - - self.docker_client: docker.DockerClient = self._init_docker_client( - config.sandbox.docker_url - ) - self.container_name = self.config.sandbox.container_name - self.container = None - self.action_semaphore = threading.Semaphore(1) - - # Buffer for container logs - self.log_buffer: LogBuffer | None = None - - self.init_base_runtime( - config, - event_stream, - sid, - plugins, - env_vars, - status_message_callback, - ) - - async def connect(self): - self.send_status_message('STATUS$STARTING_RUNTIME') - - self._init_container() - - logger.info('Waiting for client to become ready...') - self.send_status_message('STATUS$WAITING_FOR_CLIENT') - self._wait_until_alive() - - logger.info( - f'Container attached: {self.container_name}. Runtime client is ready.' - ) - - self.send_status_message(' ') - - @staticmethod - @lru_cache(maxsize=1) - def _init_docker_client(docker_url: str) -> docker.DockerClient: - try: - base_url = docker_url or os.environ.get('DOCKER_HOST') - return docker.DockerClient(base_url) - except Exception as ex: - logger.error( - 'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.' - ) - raise ex - - @tenacity.retry( - stop=tenacity.stop_after_attempt(5) | stop_if_should_exit(), - wait=tenacity.wait_exponential(multiplier=1, min=4, max=60), - ) - def _init_container(self): - try: - logger.info('Preparing to start container...') - - self.send_status_message('STATUS$PREPARING_CONTAINER') - self.container = self.docker_client.containers.get(self.container_name) - self.log_buffer = LogBuffer(self.container) - - logger.info(f'Container attached. {self.container_name}') - - self.send_status_message('STATUS$CONTAINER_STARTED') - except Exception as e: - logger.error( - f'Error: Instance {self.container_name} FAILED to attach to container!\n' - ) - logger.exception(e) - self.close() - raise e - - def _refresh_logs(self): - logger.debug('Getting container logs...') - - assert ( - self.log_buffer is not None - ), 'Log buffer is expected to be initialized when container is started' - - logs = self.log_buffer.get_and_clear() - if logs: - formatted_logs = '\n'.join([f' |{log}' for log in logs]) - logger.info( - '\n' - + '-' * 35 - + 'Container logs:' - + '-' * 35 - + f'\n{formatted_logs}' - + '\n' - + '-' * 80 - ) - - @tenacity.retry( - stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), - wait=tenacity.wait_exponential(multiplier=2, min=1, max=20), - reraise=(ConnectionRefusedError,), - ) - def _wait_until_alive(self): - self._refresh_logs() - if not (self.log_buffer and self.log_buffer.client_ready): - raise RuntimeError('Runtime client is not ready.') - - response = send_request_with_retry( - self.session, - 'GET', - f'{self.api_url}/alive', - retry_exceptions=[ConnectionRefusedError], - timeout=300, # 5 minutes gives the container time to be alive 🧟‍♂️ - ) - if response.status_code == 200: - return - else: - msg = f'Action execution API is not alive. Response: {response}' - logger.error(msg) - raise RuntimeError(msg) - - def close(self): - """Closes the DockerRuntime and associated objects.""" - if self.log_buffer: - self.log_buffer.close() - - if self.session: - self.session.close() - - def run_action(self, action: Action) -> Observation: - if isinstance(action, FileEditAction): - return self.edit(action) - - # set timeout to default if not set - if action.timeout is None: - action.timeout = self.config.sandbox.timeout - - with self.action_semaphore: - if not action.runnable: - return NullObservation('') - if ( - hasattr(action, 'confirmation_state') - and action.confirmation_state - == ActionConfirmationStatus.AWAITING_CONFIRMATION - ): - return NullObservation('') - action_type = action.action # type: ignore[attr-defined] - if action_type not in ACTION_TYPE_TO_CLASS: - return FatalErrorObservation(f'Action {action_type} does not exist.') - if not hasattr(self, action_type): - return FatalErrorObservation( - f'Action {action_type} is not supported in the current runtime.' - ) - if ( - getattr(action, 'confirmation_state', None) - == ActionConfirmationStatus.REJECTED - ): - return UserRejectObservation( - 'Action has been rejected by the user! Waiting for further user input.' - ) - - self._refresh_logs() - - assert action.timeout is not None - - try: - response = send_request_with_retry( - self.session, - 'POST', - f'{self.api_url}/execute_action', - json={'action': event_to_dict(action)}, - timeout=action.timeout, - ) - if response.status_code == 200: - output = response.json() - obs = observation_from_dict(output) - obs._cause = action.id # type: ignore[attr-defined] - else: - logger.debug(f'action: {action}') - logger.debug(f'response: {response}') - error_message = response.text - logger.error(f'Error from server: {error_message}') - obs = FatalErrorObservation( - f'Action execution failed: {error_message}' - ) - except requests.Timeout: - logger.error('No response received within the timeout period.') - obs = FatalErrorObservation( - f'Action execution timed out after {action.timeout} seconds.' - ) - except Exception as e: - logger.error(f'Error during action execution: {e}') - obs = FatalErrorObservation(f'Action execution failed: {str(e)}') - self._refresh_logs() - return obs - - def run(self, action: CmdRunAction) -> Observation: - return self.run_action(action) - - def run_ipython(self, action: IPythonRunCellAction) -> Observation: - return self.run_action(action) - - def read(self, action: FileReadAction) -> Observation: - return self.run_action(action) - - def write(self, action: FileWriteAction) -> Observation: - return self.run_action(action) - - def browse(self, action: BrowseURLAction) -> Observation: - return self.run_action(action) - - def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: - return self.run_action(action) - - # ==================================================================== - # Implement these methods (for file operations) in the subclass - # ==================================================================== - - def copy_to( - self, host_src: str, sandbox_dest: str, recursive: bool = False - ) -> None: - if not os.path.exists(host_src): - raise FileNotFoundError(f'Source file {host_src} does not exist') - - self._refresh_logs() - try: - if recursive: - # For recursive copy, create a zip file - with tempfile.NamedTemporaryFile( - suffix='.zip', delete=False - ) as temp_zip: - temp_zip_path = temp_zip.name - - with ZipFile(temp_zip_path, 'w') as zipf: - for root, _, files in os.walk(host_src): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath( - file_path, os.path.dirname(host_src) - ) - zipf.write(file_path, arcname) - - upload_data = {'file': open(temp_zip_path, 'rb')} - else: - # For single file copy - upload_data = {'file': open(host_src, 'rb')} - - params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} - - response = send_request_with_retry( - self.session, - 'POST', - f'{self.api_url}/upload_file', - files=upload_data, - params=params, - timeout=300, - ) - if response.status_code == 200: - return - else: - error_message = response.text - raise Exception(f'Copy operation failed: {error_message}') - - except requests.Timeout: - raise TimeoutError('Copy operation timed out') - except Exception as e: - raise RuntimeError(f'Copy operation failed: {str(e)}') - finally: - if recursive: - os.unlink(temp_zip_path) - logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}') - self._refresh_logs() - - def list_files(self, path: str | None = None) -> list[str]: - """List files in the sandbox. - - If path is None, list files in the sandbox's initial working directory (e.g., /workspace). - """ - self._refresh_logs() - try: - data = {} - if path is not None: - data['path'] = path - - response = send_request_with_retry( - self.session, - 'POST', - f'{self.api_url}/list_files', - json=data, - timeout=30, # 30 seconds because the container should already be alive - ) - if response.status_code == 200: - response_json = response.json() - assert isinstance(response_json, list) - return response_json - else: - error_message = response.text - raise Exception(f'List files operation failed: {error_message}') - except requests.Timeout: - raise TimeoutError('List files operation timed out') - except Exception as e: - raise RuntimeError(f'List files operation failed: {str(e)}') - - def copy_from(self, path: str) -> bytes: - """Zip all files in the sandbox and return as a stream of bytes.""" - self._refresh_logs() - try: - params = {'path': path} - response = send_request_with_retry( - self.session, - 'GET', - f'{self.api_url}/download_files', - params=params, - stream=True, - timeout=30, - ) - if response.status_code == 200: - data = response.content - return data - else: - error_message = response.text - raise Exception(f'Copy operation failed: {error_message}') - except requests.Timeout: - raise TimeoutError('Copy operation timed out') - except Exception as e: - raise RuntimeError(f'Copy operation failed: {str(e)}') - - def _is_port_in_use_docker(self, port): - containers = self.docker_client.containers.list() - for container in containers: - container_ports = container.ports - if str(port) in str(container_ports): - return True - return False - - def _find_available_port(self, max_attempts=5): - port = 39999 - for _ in range(max_attempts): - port = find_available_tcp_port(30000, 39999) - if not self._is_port_in_use_docker(port): - return port - # If no port is found after max_attempts, return the last tried port - return port - - def send_status_message(self, message: str): - """Sends a status message if the callback function was provided.""" - if self.status_message_callback: - self.status_message_callback(message) diff --git a/openhands/runtime/impl/eventstream/eventstream_runtime.py b/openhands/runtime/impl/eventstream/eventstream_runtime.py index e65c36cc67c6..61d3d8f10829 100644 --- a/openhands/runtime/impl/eventstream/eventstream_runtime.py +++ b/openhands/runtime/impl/eventstream/eventstream_runtime.py @@ -171,8 +171,9 @@ def __init__( self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' self.session = requests.Session() self.status_callback = status_callback - - self.docker_client: docker.DockerClient = self._init_docker_client() + self.docker_client: docker.DockerClient = self._init_docker_client( + self.config.sandbox.docker_endpoint + ) self.base_container_image = self.config.sandbox.base_container_image self.runtime_container_image = self.config.sandbox.runtime_container_image self.container_name = CONTAINER_NAME_PREFIX + sid @@ -190,6 +191,14 @@ def __init__( f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}', ) + # if docker context endpoint is set, custom sandbox is assumed. + if self.config.sandbox.docker_endpoint: + attach_to_existing = True + self.api_url = self.config.sandbox.remote_runtime_api_url + if not self.config.sandbox.container_name: + raise ValueError('sandbox container_name cannot be None') + self.container_name = self.config.sandbox.container_name + self.init_base_runtime( config, event_stream, @@ -259,8 +268,10 @@ async def connect(self): @staticmethod @lru_cache(maxsize=1) - def _init_docker_client() -> docker.DockerClient: + def _init_docker_client(docker_endpoint: str | None) -> docker.DockerClient: try: + if docker_endpoint: + return docker.DockerClient(docker_endpoint) return docker.from_env() except Exception as ex: logger.error( @@ -391,6 +402,14 @@ def _init_container(self): def _attach_to_container(self): self._container_port = 0 self.container = self.docker_client.containers.get(self.container_name) + + if self.config.sandbox.docker_endpoint: + self.log( + 'debug', + f'attached to sandbox: {self.container_name} {self.api_url}', + ) + return + for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore self._container_port = int(port.split('/')[0]) break