Skip to content

Commit

Permalink
feat(framework) Add SuperExec logcatcher (#3584)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Steiner <[email protected]>
Co-authored-by: Robert Steiner <[email protected]>
Co-authored-by: Chong Shen Ng <[email protected]>
Co-authored-by: Charles Beauville <[email protected]>
Co-authored-by: jafermarq <[email protected]>
Co-authored-by: Taner Topal <[email protected]>
Co-authored-by: Daniel J. Beutel <[email protected]>
  • Loading branch information
6 people authored Sep 19, 2024
1 parent a70449d commit ebb78ae
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
64 changes: 61 additions & 3 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"""SuperExec API servicer."""


import select
import threading
import time
from collections.abc import Generator
from logging import ERROR, INFO
from typing import Any
Expand All @@ -33,6 +36,8 @@

from .executor import Executor, RunTracker

SELECT_TIMEOUT = 1 # Timeout for selecting ready-to-read file descriptors (in seconds)


class ExecServicer(exec_pb2_grpc.ExecServicer):
"""SuperExec API servicer."""
Expand All @@ -59,13 +64,66 @@ def StartRun(

self.runs[run.run_id] = run

# Start a background thread to capture the log output
capture_thread = threading.Thread(
target=_capture_logs, args=(run,), daemon=True
)
capture_thread.start()

return StartRunResponse(run_id=run.run_id)

def StreamLogs(
def StreamLogs( # pylint: disable=C0103
self, request: StreamLogsRequest, context: grpc.ServicerContext
) -> Generator[StreamLogsResponse, Any, None]:
"""Get logs."""
logs = ["a", "b", "c"]
log(INFO, "ExecServicer.StreamLogs")

# Exit if `run_id` not found
if request.run_id not in self.runs:
context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found")

last_sent_index = 0
while context.is_active():
for i in range(len(logs)): # pylint: disable=C0200
# Yield n'th row of logs, if n'th row < len(logs)
logs = self.runs[request.run_id].logs
for i in range(last_sent_index, len(logs)):
yield StreamLogsResponse(log_output=logs[i])
last_sent_index = len(logs)

# Wait for and continue to yield more log responses only if the
# run isn't completed yet. If the run is finished, the entire log
# is returned at this point and the server ends the stream.
if self.runs[request.run_id].proc.poll() is not None:
log(INFO, "All logs for run ID `%s` returned", request.run_id)
return

time.sleep(1.0) # Sleep briefly to avoid busy waiting


def _capture_logs(
run: RunTracker,
) -> None:
while True:
# Explicitly check if Popen.poll() is None. Required for `pytest`.
if run.proc.poll() is None:
# Select streams only when ready to read
ready_to_read, _, _ = select.select(
[run.proc.stdout, run.proc.stderr],
[],
[],
SELECT_TIMEOUT,
)
# Read from std* and append to RunTracker.logs
for stream in ready_to_read:
line = stream.readline().rstrip()
if line:
run.logs.append(f"{line}")

# Close std* to prevent blocking
elif run.proc.poll() is not None:
log(INFO, "Subprocess finished, exiting log capture")
if run.proc.stdout:
run.proc.stdout.close()
if run.proc.stderr:
run.proc.stderr.close()
break
21 changes: 19 additions & 2 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@


import subprocess
from unittest.mock import MagicMock
from unittest.mock import MagicMock, Mock

from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611

from .exec_servicer import ExecServicer
from .exec_servicer import ExecServicer, _capture_logs


def test_start_run() -> None:
Expand Down Expand Up @@ -50,3 +50,20 @@ def test_start_run() -> None:
response = servicer.StartRun(request, context_mock)

assert response.run_id == 10


def test_capture_logs() -> None:
"""Test capture_logs function."""
run_res = Mock()
run_res.logs = []
with subprocess.Popen(
["echo", "success"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
run_res.proc = proc
_capture_logs(run_res)

assert len(run_res.logs) == 1
assert run_res.logs[0] == "success"
3 changes: 2 additions & 1 deletion src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Execute and monitor a Flower run."""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field
from subprocess import Popen
from typing import Optional

Expand All @@ -28,6 +28,7 @@ class RunTracker:

run_id: int
proc: Popen # type: ignore
logs: list[str] = field(default_factory=list)


class Executor(ABC):
Expand Down

0 comments on commit ebb78ae

Please sign in to comment.