diff --git a/openhands/runtime/utils/log_streamer.py b/openhands/runtime/utils/log_streamer.py index 24a28b93f36c..1bb4582c8ea7 100644 --- a/openhands/runtime/utils/log_streamer.py +++ b/openhands/runtime/utils/log_streamer.py @@ -17,16 +17,25 @@ def __init__( logFn: Callable, ): self.log = logFn - self.log_generator = container.logs(stream=True, follow=True) + self.stdout_thread = None + self.log_generator = None self._stop_event = threading.Event() - # Start the stdout streaming thread - self.stdout_thread = threading.Thread(target=self._stream_logs) - self.stdout_thread.daemon = True - self.stdout_thread.start() + try: + self.log_generator = container.logs(stream=True, follow=True) + # Start the stdout streaming thread + self.stdout_thread = threading.Thread(target=self._stream_logs) + self.stdout_thread.daemon = True + self.stdout_thread.start() + except Exception as e: + self.log('error', f'Failed to initialize log streaming: {e}') def _stream_logs(self): """Stream logs from the Docker container to stdout.""" + if not self.log_generator: + self.log('error', 'Log generator not initialized') + return + try: for log_line in self.log_generator: if self._stop_event.is_set(): diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py new file mode 100644 index 000000000000..185f49bb14ab --- /dev/null +++ b/tests/unit/test_log_streamer.py @@ -0,0 +1,53 @@ +import unittest +from unittest.mock import Mock, patch + +from openhands.runtime.utils.log_streamer import LogStreamer + + +class TestLogStreamer(unittest.TestCase): + def setUp(self): + self.mock_container = Mock() + self.mock_log_fn = Mock() + + def test_init_failure_handling(self): + """Test that LogStreamer handles initialization failures gracefully.""" + self.mock_container.logs.side_effect = Exception("Test error") + + streamer = LogStreamer(self.mock_container, self.mock_log_fn) + self.assertIsNone(streamer.stdout_thread) + self.assertIsNone(streamer.log_generator) + self.mock_log_fn.assert_called_with('error', 'Failed to initialize log streaming: Test error') + + def test_stream_logs_without_generator(self): + """Test that _stream_logs handles missing log generator gracefully.""" + streamer = LogStreamer(self.mock_container, self.mock_log_fn) + streamer.log_generator = None + streamer._stream_logs() + self.mock_log_fn.assert_called_with('error', 'Log generator not initialized') + + def test_cleanup_without_thread(self): + """Test that cleanup works even if stdout_thread is not initialized.""" + streamer = LogStreamer(self.mock_container, self.mock_log_fn) + streamer.stdout_thread = None + streamer.close() # Should not raise any exceptions + + def test_normal_operation(self): + """Test normal operation of LogStreamer.""" + mock_logs = [b'test log 1\n', b'test log 2\n'] + self.mock_container.logs.return_value = mock_logs + + streamer = LogStreamer(self.mock_container, self.mock_log_fn) + self.assertIsNotNone(streamer.stdout_thread) + self.assertIsNotNone(streamer.log_generator) + + # Let the thread process the logs + streamer.close() + + # Verify logs were processed + expected_calls = [ + ('debug', '[inside container] test log 1'), + ('debug', '[inside container] test log 2') + ] + actual_calls = [(args[0], args[1]) for args, _ in self.mock_log_fn.call_args_list] + for expected in expected_calls: + self.assertIn(expected, actual_calls)