Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support directly stream logs from container to stdout in debug mode #5408

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 9 additions & 96 deletions openhands/runtime/impl/eventstream/eventstream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
Expand All @@ -58,68 +59,6 @@ def remove_all_runtime_containers():
atexit.register(remove_all_runtime_containers)


class LogBuffer:
"""Synchronous buffer for Docker container logs.

This class provides a thread-safe way to collect, store, and retrieve logs
from a Docker container. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
"""

def __init__(self, container: docker.models.containers.Container, logFn: Callable):
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
self._stop_event = threading.Event()
self.log_generator = container.logs(stream=True, follow=True)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()
self.log = logFn

def append(self, log_line: str):
with self.lock:
self.buffer.append(log_line)

def get_and_clear(self) -> list[str]:
with self.lock:
logs = list(self.buffer)
self.buffer.clear()
return logs

def stream_logs(self):
"""Stream logs from the Docker container in a separate thread.

This method runs in its own thread to handle the blocking
operation of reading log lines from the Docker SDK's synchronous generator.
"""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.append(decoded_line)
except Exception as e:
self.log('error', f'Error streaming docker logs: {e}')

def __del__(self):
if self.log_stream_thread.is_alive():
self.log(
'warn',
"LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.",
)
self.close(timeout=5)

def close(self, timeout: float = 5.0):
self._stop_event.set()
self.log_stream_thread.join(timeout)
# Close the log generator to release the file descriptor
if hasattr(self.log_generator, 'close'):
self.log_generator.close()


class EventStreamRuntime(Runtime):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
Expand Down Expand Up @@ -186,7 +125,7 @@ def __init__(
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)

# Buffer for container logs
self.log_buffer: LogBuffer | None = None
self.log_streamer: LogStreamer | None = None

self.init_base_runtime(
config,
Expand Down Expand Up @@ -241,7 +180,7 @@ async def connect(self):
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)

self.log_buffer = LogBuffer(self.container, self.log)
self.log_streamer = LogStreamer(self.container, self.log)

if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
Expand Down Expand Up @@ -407,27 +346,6 @@ def _attach_to_container(self):
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)

def _refresh_logs(self):
self.log('debug', 'Getting container logs...')

assert (
self.log_buffer is not None
), 'Log buffer is expected to be initialized when container is started'

logs = self.log_buffer.get_and_clear()
if logs:
formatted_logs = '\n'.join([f' |{log}' for log in logs])
self.log(
'debug',
'\n'
+ '-' * 35
+ 'Container logs:'
+ '-' * 35
+ f'\n{formatted_logs}'
+ '\n'
+ '-' * 80,
)

@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
Expand All @@ -446,8 +364,7 @@ def _wait_until_alive(self):
except docker.errors.NotFound:
raise RuntimeNotFoundError(f'Container {self.container_name} not found.')

self._refresh_logs()
if not self.log_buffer:
if not self.log_streamer:
raise RuntimeError('Runtime client is not ready.')

with send_request(
Expand All @@ -464,8 +381,8 @@ def close(self, rm_all_containers: bool | None = None):
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
if self.log_buffer:
self.log_buffer.close()
if self.log_streamer:
self.log_streamer.close()

if self.session:
self.session.close()
Expand Down Expand Up @@ -513,8 +430,6 @@ def run_action(self, action: Action) -> Observation:
'Action has been rejected by the user! Waiting for further user input.'
)

self._refresh_logs()

assert action.timeout is not None

try:
Expand All @@ -533,7 +448,7 @@ def run_action(self, action: Action) -> Observation:
raise RuntimeError(
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
)
self._refresh_logs()

return obs

def run(self, action: CmdRunAction) -> Observation:
Expand Down Expand Up @@ -564,7 +479,6 @@ def copy_to(
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')

self._refresh_logs()
try:
if recursive:
# For recursive copy, create a zip file
Expand Down Expand Up @@ -609,14 +523,13 @@ def copy_to(
self.log(
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
)
self._refresh_logs()

def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.

If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
self._refresh_logs()

try:
data = {}
if path is not None:
Expand All @@ -637,7 +550,7 @@ def list_files(self, path: str | None = None) -> list[str]:

def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
self._refresh_logs()

try:
params = {'path': path}
with send_request(
Expand Down
53 changes: 32 additions & 21 deletions openhands/runtime/impl/modal/modal_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogBuffer,
LogStreamer,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
Expand All @@ -32,24 +32,38 @@ def bytes_shim(string_generator) -> Generator[bytes, None, None]:
yield line.encode('utf-8')


class ModalLogBuffer(LogBuffer):
"""Synchronous buffer for Modal sandbox logs.
class ModalLogStreamer(LogStreamer):
"""Streams Modal sandbox logs to stdout.

This class provides a thread-safe way to collect, store, and retrieve logs
from a Modal sandbox. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
This class provides a way to stream logs from a Modal sandbox directly to stdout
through the provided logging function.
"""

def __init__(self, sandbox: modal.Sandbox):
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
def __init__(
self,
sandbox: modal.Sandbox,
logFn: Callable,
):
self.log = logFn
self._stop_event = threading.Event()
self.log_generator = bytes_shim(sandbox.stderr)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()

# Start the stdout streaming thread
self.stdout_thread = threading.Thread(target=self._stream_logs)
self.stdout_thread.daemon = True
self.stdout_thread.start()

def _stream_logs(self):
"""Stream logs from the Modal sandbox."""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.log('debug', f'[inside sandbox] {decoded_line}')
except Exception as e:
self.log('error', f'Error streaming modal logs: {e}')


class ModalRuntime(EventStreamRuntime):
Expand Down Expand Up @@ -109,7 +123,7 @@ def __init__(
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time

# Buffer for container logs
self.log_buffer: LogBuffer | None = None
self.log_streamer: LogStreamer | None = None

if self.config.sandbox.runtime_extra_deps:
self.log(
Expand Down Expand Up @@ -156,7 +170,7 @@ async def connect(self):

self.send_status_message('STATUS$CONTAINER_STARTED')

self.log_buffer = ModalLogBuffer(self.sandbox)
self.log_streamer = ModalLogStreamer(self.sandbox, self.log)
if self.sandbox is None:
raise Exception('Sandbox not initialized')
tunnel = self.sandbox.tunnels()[self.container_port]
Expand Down Expand Up @@ -278,11 +292,8 @@ def _init_sandbox(

def close(self):
"""Closes the ModalRuntime and associated objects."""
# if self.temp_dir_handler:
# self.temp_dir_handler.__exit__(None, None, None)

if self.log_buffer:
self.log_buffer.close()
if self.log_streamer:
self.log_streamer.close()

if self.session:
self.session.close()
Expand Down
Loading
Loading