Skip to content

Commit

Permalink
Enhancements from 2.4 (NVIDIA#2519)
Browse files Browse the repository at this point in the history
* Starts heartbeat after task is pull and before task execution (NVIDIA#2415)

* Starts pipe handler heartbeat send/check after task is pull before task execution (NVIDIA#2442)

* [2.4] Improve cell pipe timeout handling (NVIDIA#2441)

* improve cell pipe timeout handling

* improved end and abort handling

* improve timeout handling

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <[email protected]>

* [2.4] Enhance launcher executor (NVIDIA#2433)

* Update LauncherExecutor logs and execution setup timeout

* Change name

* [2.4] Fire and forget for pipe handler control messages (NVIDIA#2413)

* Fire and forget for pipe handler control messages

* Add default timeout value

* fix wait-for-reply (NVIDIA#2478)

* Fix pipe handler timeout in task exchanger and launcher executor (NVIDIA#2495)

* Fix metric relay pipe handler timeout (NVIDIA#2496)

* Rely on launcher check_run_status to pause/resume hb (NVIDIA#2502)

Co-authored-by: Chester Chen <[email protected]>

---------

Co-authored-by: Yan Cheng <[email protected]>
Co-authored-by: Chester Chen <[email protected]>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent bc7d96d commit d4afbee
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 118 deletions.
33 changes: 17 additions & 16 deletions nvflare/app_common/executors/client_api_launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 0.01,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
Expand All @@ -51,22 +51,23 @@ def __init__(
"""Initializes the ClientAPILauncherExecutor.
Args:
pipe_id (Optional[str]): Identifier for obtaining the Pipe from NVFlare components.
pipe_id (str): Identifier for obtaining the Pipe from NVFlare components.
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 4).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_pre_init_timeout (float): Time to wait for external process before it calls flare.init().
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -83,7 +84,7 @@ def __init__(
launch_timeout=launch_timeout,
task_wait_timeout=task_wait_timeout,
last_result_transfer_timeout=last_result_transfer_timeout,
external_execution_wait=external_execution_wait,
external_pre_init_timeout=external_pre_init_timeout,
peer_read_timeout=peer_read_timeout,
monitor_interval=monitor_interval,
read_interval=read_interval,
Expand Down
67 changes: 37 additions & 30 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
monitor_interval: float = 1.0,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 0.1,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 1,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
evaluate_task_name: str = "evaluate",
Expand All @@ -63,18 +63,19 @@ def __init__(
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 1).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_pre_init_timeout (float): Time to wait for external process before it calls flare.init().
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -96,7 +97,7 @@ def __init__(
self._launcher_finish = False
self._launcher_finish_time = None
self._last_result_transfer_timeout = last_result_transfer_timeout
self._external_execution_wait = external_execution_wait
self._external_pre_init_timeout = external_pre_init_timeout
self._received_result = Event()
self._job_end = False

Expand Down Expand Up @@ -248,7 +249,6 @@ def _initialize_external_execution(
self.log_error(fl_ctx, "External execution set up failed.")
abort_signal.trigger("External execution set up failed.")
return False
time.sleep(self._external_execution_wait)
return True

def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any:
Expand Down Expand Up @@ -276,36 +276,43 @@ def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs
def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal):
start_time = time.time()
while True:
if self._launch_timeout and time.time() - start_time >= self._launch_timeout:
self.log_error(fl_ctx, f"External execution is not set up within timeout: {self._launch_timeout}")
if self._external_pre_init_timeout and time.time() - start_time >= self._external_pre_init_timeout:
self.log_error(
fl_ctx,
f"External process has not called flare.init within timeout: {self._external_pre_init_timeout}",
)
return False

if abort_signal.triggered:
self.log_info(fl_ctx, "External execution has not called flare.init but abort signal is triggered.")
return False

if self.peer_is_up_or_dead():
return True

if self.launcher.check_run_status(task_name, fl_ctx) != LauncherRunStatus.RUNNING:
run_status = self.launcher.check_run_status(task_name, fl_ctx)
if run_status != LauncherRunStatus.RUNNING:
self.log_info(
fl_ctx, f"External process has not called flare.init and run status becomes {run_status}."
)
return False

time.sleep(0.1)

def _finalize_external_execution(
self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal
) -> bool:
with self._lock:
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False

check_run_status = self._execute_launcher_method_in_thread_executor(
method_name="check_run_status",
task_name=task_name,
fl_ctx=fl_ctx,
)
if check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}")

self.log_info(fl_ctx, f"Calling stop task ({task_name}).")
Expand Down Expand Up @@ -365,9 +372,6 @@ def _monitor_launcher(self, fl_ctx: FLContext):
if self.launcher is None:
break

if self._current_task is None:
continue

task_name = self._current_task
run_status = self._execute_launcher_method_in_thread_executor(
method_name="check_run_status",
Expand All @@ -381,16 +385,19 @@ def _monitor_launcher(self, fl_ctx: FLContext):
continue

elif run_status == LauncherRunStatus.NOT_RUNNING:
# pause pipe handler because external process is not running
self.pause_pipe_handler()
continue

elif run_status == LauncherRunStatus.RUNNING:
# resume pipe handler when external process is running
self.resume_pipe_handler()
continue

elif (
run_status == LauncherRunStatus.COMPLETE_FAILED or run_status == LauncherRunStatus.COMPLETE_SUCCESS
):
# pause pipe handler because external process is completed
self.pause_pipe_handler()
if not self._launcher_finish:
self._launcher_finish_time = time.time()
Expand Down
17 changes: 9 additions & 8 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(
pipe_id: str,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: Optional[float] = 30.0,
heartbeat_timeout: Optional[float] = 60.0,
resend_interval: float = 2.0,
max_resends: Optional[int] = None,
peer_read_timeout: Optional[float] = 5.0,
peer_read_timeout: Optional[float] = 60.0,
task_wait_time: Optional[float] = None,
result_poll_interval: float = 0.5,
pipe_channel_name=PipeChannelName.TASK,
Expand All @@ -48,19 +48,16 @@ def __init__(
Args:
pipe_id (str): component id of pipe.
read_interval (float): how often to read from pipe.
Defaults to 0.5.
heartbeat_interval (float): how often to send heartbeat to peer.
Defaults to 5.0.
heartbeat_timeout (float, optional): how long to wait for a
heartbeat from the peer before treating the peer as dead,
0 means DO NOT check for heartbeat. Defaults to 30.0.
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send.
None means no resend. Note that if the pipe does not support resending,
then no resend. Defaults to 2.0.
then no resend.
max_resends (int, optional): max number of resend. None means no limit.
Defaults to None.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
Defaults to 5.0.
task_wait_time (float, optional): how long to wait for a task to complete.
None means waiting forever. Defaults to None.
result_poll_interval (float): how often to poll task result.
Expand Down Expand Up @@ -145,7 +142,7 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort
task_id = shareable.get_header(key=FLContextKey.TASK_ID)

# send to peer
self.log_debug(fl_ctx, "sending task to peer ...")
self.log_info(fl_ctx, f"sending task to peer {self.peer_read_timeout=}")
req = Message.new_request(topic=task_name, data=shareable, msg_id=task_id)
start_time = time.time()
has_been_read = self.pipe_handler.send_to_peer(req, timeout=self.peer_read_timeout, abort_signal=abort_signal)
Expand All @@ -156,6 +153,8 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort
)
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

self.log_info(fl_ctx, f"task {task_name} sent to peer in {time.time()-start_time} secs")

# wait for result
self.log_debug(fl_ctx, "Waiting for result from peer")
start = time.time()
Expand Down Expand Up @@ -213,6 +212,8 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort
if not self.check_output_shareable(task_name, result, fl_ctx):
self.log_error(fl_ctx, "bad task result from peer")
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

self.log_info(fl_ctx, f"received result of {task_name} from peer in {time.time()-start} secs")
return result
except Exception as ex:
self.log_error(fl_ctx, f"Failed to convert result: {secure_format_exception(ex)}")
Expand Down
4 changes: 1 addition & 3 deletions nvflare/app_common/widgets/metric_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(
pipe_id: str,
read_interval=0.1,
heartbeat_interval=5.0,
heartbeat_timeout=30.0,
pipe_channel_name=PipeChannelName.METRIC,
event_type: str = ANALYTIC_EVENT_TYPE,
fed_event: bool = True,
Expand All @@ -42,7 +41,6 @@ def __init__(
self.pipe_id = pipe_id
self._read_interval = read_interval
self._heartbeat_interval = heartbeat_interval
self._heartbeat_timeout = heartbeat_timeout
self.pipe_channel_name = pipe_channel_name
self.pipe = None
self.pipe_handler = None
Expand All @@ -64,7 +62,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
pipe=self.pipe,
read_interval=self._read_interval,
heartbeat_interval=self._heartbeat_interval,
heartbeat_timeout=self._heartbeat_timeout,
heartbeat_timeout=0,
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe_handler.set_message_cb(self._pipe_msg_cb)
Expand Down
Loading

0 comments on commit d4afbee

Please sign in to comment.