Skip to content

Commit

Permalink
Merge LogStreamer into RunTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
chongshenng committed Jun 17, 2024
1 parent 91e3e21 commit 2219ca4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
26 changes: 14 additions & 12 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
StreamLogsResponse,
)

from .executor import Executor, LogStreamer, RunTracker
from .executor import Executor, RunTracker


class ExecServicer(exec_pb2_grpc.ExecServicer):
Expand All @@ -45,7 +45,7 @@ def __init__(self, executor: Executor) -> None:
self.lock = threading.Lock()

self.select_timeout: int = 1
self.log_streams: Dict[int, LogStreamer] = {}
# self.log_streams: Dict[int, LogStreamer] = {}

def StartRun(
self, request: StartRunRequest, context: grpc.ServicerContext
Expand All @@ -59,17 +59,16 @@ def StartRun(
log(ERROR, "Executor failed to start run")
return StartRunResponse()

self.runs[run.run_id] = run

stop_event = threading.Event()
logs: List[str] = []
# Start a background thread to capture the log output
capture_thread = threading.Thread(
target=self._capture_logs, args=(run, stop_event, logs), daemon=True
target=self._capture_logs, args=(run,), daemon=True
)
with self.lock:
self.log_streams[run.run_id] = LogStreamer(
process=run.proc,
self.runs[run.run_id] = RunTracker(
run_id=run.run_id,
proc=run.proc,
stop_event=stop_event,
logs=logs,
capture_thread=capture_thread,
Expand All @@ -79,8 +78,11 @@ def StartRun(
return StartRunResponse(run_id=run.run_id)

def _capture_logs(
self, run: RunTracker, stop_event: threading.Event, logs: List[str]
self,
run: RunTracker,
) -> None:
stop_event = run.stop_event
logs = run.logs
while not stop_event.is_set():
ready_to_read, _, _ = select.select(
[run.proc.stdout, run.proc.stderr],
Expand All @@ -103,7 +105,7 @@ def _capture_logs(
stop_event.set()
break

def StreamLogs(
def StreamLogs( # pylint: disable=C0103
self, request: StreamLogsRequest, context: grpc.ServicerContext
) -> Generator[StreamLogsResponse, Any, None]:
"""Get logs."""
Expand All @@ -112,14 +114,14 @@ def StreamLogs(
last_sent_index = 0
while context.is_active():
with self.lock:
if request.run_id not in self.log_streams:
if request.run_id not in self.runs:
context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found")
logs = self.log_streams[request.run_id].logs
logs = self.runs[request.run_id].logs
if last_sent_index < len(logs):
for i in range(last_sent_index, len(logs)):
yield StreamLogsResponse(log_output=logs[i])
last_sent_index = len(logs)
if self.log_streams[request.run_id].process.poll() is not None:
if self.runs[request.run_id].proc.poll() is not None:
log(INFO, "Run ID `%s` completed", request.run_id)
context.cancel()
time.sleep(0.1) # Sleep briefly to avoid busy waiting
7 changes: 0 additions & 7 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ class RunTracker:

run_id: int
proc: Popen # type: ignore


@dataclass
class LogStreamer:
"""Represents a logstream for a `run_id`."""

process: Popen # type: ignore
stop_event: threading.Event
logs: List[str]
capture_thread: threading.Thread
Expand Down

0 comments on commit 2219ca4

Please sign in to comment.