From ebb78ae705e413718231ffb873846405c88a733a Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 19 Sep 2024 12:19:12 +0100 Subject: [PATCH] feat(framework) Add SuperExec logcatcher (#3584) Signed-off-by: Robert Steiner Co-authored-by: Robert Steiner Co-authored-by: Chong Shen Ng Co-authored-by: Charles Beauville Co-authored-by: jafermarq Co-authored-by: Taner Topal Co-authored-by: Daniel J. Beutel --- src/py/flwr/superexec/exec_servicer.py | 64 ++++++++++++++++++++- src/py/flwr/superexec/exec_servicer_test.py | 21 ++++++- src/py/flwr/superexec/executor.py | 3 +- 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/superexec/exec_servicer.py b/src/py/flwr/superexec/exec_servicer.py index 5b729dbc2b8e..8bf384312fca 100644 --- a/src/py/flwr/superexec/exec_servicer.py +++ b/src/py/flwr/superexec/exec_servicer.py @@ -15,6 +15,9 @@ """SuperExec API servicer.""" +import select +import threading +import time from collections.abc import Generator from logging import ERROR, INFO from typing import Any @@ -33,6 +36,8 @@ from .executor import Executor, RunTracker +SELECT_TIMEOUT = 1 # Timeout for selecting ready-to-read file descriptors (in seconds) + class ExecServicer(exec_pb2_grpc.ExecServicer): """SuperExec API servicer.""" @@ -59,13 +64,66 @@ def StartRun( self.runs[run.run_id] = run + # Start a background thread to capture the log output + capture_thread = threading.Thread( + target=_capture_logs, args=(run,), daemon=True + ) + capture_thread.start() + return StartRunResponse(run_id=run.run_id) - def StreamLogs( + def StreamLogs( # pylint: disable=C0103 self, request: StreamLogsRequest, context: grpc.ServicerContext ) -> Generator[StreamLogsResponse, Any, None]: """Get logs.""" - logs = ["a", "b", "c"] + log(INFO, "ExecServicer.StreamLogs") + + # Exit if `run_id` not found + if request.run_id not in self.runs: + context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found") + + last_sent_index = 0 while context.is_active(): - for i in range(len(logs)): # pylint: disable=C0200 + # Yield n'th row of logs, if n'th row < len(logs) + logs = self.runs[request.run_id].logs + for i in range(last_sent_index, len(logs)): yield StreamLogsResponse(log_output=logs[i]) + last_sent_index = len(logs) + + # Wait for and continue to yield more log responses only if the + # run isn't completed yet. If the run is finished, the entire log + # is returned at this point and the server ends the stream. + if self.runs[request.run_id].proc.poll() is not None: + log(INFO, "All logs for run ID `%s` returned", request.run_id) + return + + time.sleep(1.0) # Sleep briefly to avoid busy waiting + + +def _capture_logs( + run: RunTracker, +) -> None: + while True: + # Explicitly check if Popen.poll() is None. Required for `pytest`. + if run.proc.poll() is None: + # Select streams only when ready to read + ready_to_read, _, _ = select.select( + [run.proc.stdout, run.proc.stderr], + [], + [], + SELECT_TIMEOUT, + ) + # Read from std* and append to RunTracker.logs + for stream in ready_to_read: + line = stream.readline().rstrip() + if line: + run.logs.append(f"{line}") + + # Close std* to prevent blocking + elif run.proc.poll() is not None: + log(INFO, "Subprocess finished, exiting log capture") + if run.proc.stdout: + run.proc.stdout.close() + if run.proc.stderr: + run.proc.stderr.close() + break diff --git a/src/py/flwr/superexec/exec_servicer_test.py b/src/py/flwr/superexec/exec_servicer_test.py index 83717d63a36e..b777bc806fe5 100644 --- a/src/py/flwr/superexec/exec_servicer_test.py +++ b/src/py/flwr/superexec/exec_servicer_test.py @@ -16,11 +16,11 @@ import subprocess -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611 -from .exec_servicer import ExecServicer +from .exec_servicer import ExecServicer, _capture_logs def test_start_run() -> None: @@ -50,3 +50,20 @@ def test_start_run() -> None: response = servicer.StartRun(request, context_mock) assert response.run_id == 10 + + +def test_capture_logs() -> None: + """Test capture_logs function.""" + run_res = Mock() + run_res.logs = [] + with subprocess.Popen( + ["echo", "success"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) as proc: + run_res.proc = proc + _capture_logs(run_res) + + assert len(run_res.logs) == 1 + assert run_res.logs[0] == "success" diff --git a/src/py/flwr/superexec/executor.py b/src/py/flwr/superexec/executor.py index 8d630d108b66..08b66a438e4d 100644 --- a/src/py/flwr/superexec/executor.py +++ b/src/py/flwr/superexec/executor.py @@ -15,7 +15,7 @@ """Execute and monitor a Flower run.""" from abc import ABC, abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from subprocess import Popen from typing import Optional @@ -28,6 +28,7 @@ class RunTracker: run_id: int proc: Popen # type: ignore + logs: list[str] = field(default_factory=list) class Executor(ABC):