diff --git a/nvflare/app_common/executors/client_api_launcher_executor.py b/nvflare/app_common/executors/client_api_launcher_executor.py index f15dad07b8..7cb631b4d5 100644 --- a/nvflare/app_common/executors/client_api_launcher_executor.py +++ b/nvflare/app_common/executors/client_api_launcher_executor.py @@ -31,12 +31,12 @@ def __init__( launch_timeout: Optional[float] = None, task_wait_timeout: Optional[float] = None, last_result_transfer_timeout: float = 300.0, - external_execution_wait: float = 5.0, - peer_read_timeout: Optional[float] = None, + external_pre_init_timeout: float = 60.0, + peer_read_timeout: Optional[float] = 60.0, monitor_interval: float = 0.01, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: float = 30.0, + heartbeat_timeout: float = 60.0, workers: int = 4, train_with_evaluation: bool = True, train_task_name: str = "train", @@ -51,22 +51,23 @@ def __init__( """Initializes the ClientAPILauncherExecutor. Args: - pipe_id (Optional[str]): Identifier for obtaining the Pipe from NVFlare components. + pipe_id (str): Identifier for obtaining the Pipe from NVFlare components. launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components. launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout). task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout). - last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0). + last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process. This value should be greater than the time needed for sending the whole result. - peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout). - monitor_interval (float): Interval for monitoring the launcher (default: 0.01). - read_interval (float): Interval for reading from the pipe (default: 0.5). - heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0). - heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0). - workers (int): Number of worker threads needed (default: 4). - train_with_evaluation (bool): Whether to run training with global model evaluation (default: True). - train_task_name (str): Task name of train mode (default: train). - evaluate_task_name (str): Task name of evaluate mode (default: evaluate). - submit_model_task_name (str): Task name of submit_model mode (default: submit_model). + external_pre_init_timeout (float): Time to wait for external process before it calls flare.init(). + peer_read_timeout (float, optional): time to wait for peer to accept sent message. + monitor_interval (float): Interval for monitoring the launcher. + read_interval (float): Interval for reading from the pipe. + heartbeat_interval (float): Interval for sending heartbeat to the peer. + heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer. + workers (int): Number of worker threads needed. + train_with_evaluation (bool): Whether to run training with global model evaluation. + train_task_name (str): Task name of train mode. + evaluate_task_name (str): Task name of evaluate mode. + submit_model_task_name (str): Task name of submit_model mode. from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. This ParamsConverter will be called when model is sent from nvflare controller side to executor side. to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. @@ -83,7 +84,7 @@ def __init__( launch_timeout=launch_timeout, task_wait_timeout=task_wait_timeout, last_result_transfer_timeout=last_result_transfer_timeout, - external_execution_wait=external_execution_wait, + external_pre_init_timeout=external_pre_init_timeout, peer_read_timeout=peer_read_timeout, monitor_interval=monitor_interval, read_interval=read_interval, diff --git a/nvflare/app_common/executors/launcher_executor.py b/nvflare/app_common/executors/launcher_executor.py index d4dfe964df..db181d5279 100644 --- a/nvflare/app_common/executors/launcher_executor.py +++ b/nvflare/app_common/executors/launcher_executor.py @@ -42,13 +42,13 @@ def __init__( launch_timeout: Optional[float] = None, task_wait_timeout: Optional[float] = None, last_result_transfer_timeout: float = 300.0, - external_execution_wait: float = 5.0, - peer_read_timeout: Optional[float] = None, - monitor_interval: float = 1.0, + external_pre_init_timeout: float = 60.0, + peer_read_timeout: Optional[float] = 60.0, + monitor_interval: float = 0.1, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: float = 30.0, - workers: int = 1, + heartbeat_timeout: float = 60.0, + workers: int = 4, train_with_evaluation: bool = True, train_task_name: str = "train", evaluate_task_name: str = "evaluate", @@ -63,18 +63,19 @@ def __init__( launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components. launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout). task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout). - last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0). + last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process. This value should be greater than the time needed for sending the whole result. - peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout). - monitor_interval (float): Interval for monitoring the launcher (default: 0.01). - read_interval (float): Interval for reading from the pipe (default: 0.5). - heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0). - heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0). - workers (int): Number of worker threads needed (default: 1). - train_with_evaluation (bool): Whether to run training with global model evaluation (default: True). - train_task_name (str): Task name of train mode (default: train). - evaluate_task_name (str): Task name of evaluate mode (default: evaluate). - submit_model_task_name (str): Task name of submit_model mode (default: submit_model). + external_pre_init_timeout (float): Time to wait for external process before it calls flare.init(). + peer_read_timeout (float, optional): time to wait for peer to accept sent message. + monitor_interval (float): Interval for monitoring the launcher. + read_interval (float): Interval for reading from the pipe. + heartbeat_interval (float): Interval for sending heartbeat to the peer. + heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer. + workers (int): Number of worker threads needed. + train_with_evaluation (bool): Whether to run training with global model evaluation. + train_task_name (str): Task name of train mode. + evaluate_task_name (str): Task name of evaluate mode. + submit_model_task_name (str): Task name of submit_model mode. from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. This ParamsConverter will be called when model is sent from nvflare controller side to executor side. to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. @@ -96,7 +97,7 @@ def __init__( self._launcher_finish = False self._launcher_finish_time = None self._last_result_transfer_timeout = last_result_transfer_timeout - self._external_execution_wait = external_execution_wait + self._external_pre_init_timeout = external_pre_init_timeout self._received_result = Event() self._job_end = False @@ -248,7 +249,6 @@ def _initialize_external_execution( self.log_error(fl_ctx, "External execution set up failed.") abort_signal.trigger("External execution set up failed.") return False - time.sleep(self._external_execution_wait) return True def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any: @@ -276,17 +276,25 @@ def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal): start_time = time.time() while True: - if self._launch_timeout and time.time() - start_time >= self._launch_timeout: - self.log_error(fl_ctx, f"External execution is not set up within timeout: {self._launch_timeout}") + if self._external_pre_init_timeout and time.time() - start_time >= self._external_pre_init_timeout: + self.log_error( + fl_ctx, + f"External process has not called flare.init within timeout: {self._external_pre_init_timeout}", + ) return False if abort_signal.triggered: + self.log_info(fl_ctx, "External execution has not called flare.init but abort signal is triggered.") return False if self.peer_is_up_or_dead(): return True - if self.launcher.check_run_status(task_name, fl_ctx) != LauncherRunStatus.RUNNING: + run_status = self.launcher.check_run_status(task_name, fl_ctx) + if run_status != LauncherRunStatus.RUNNING: + self.log_info( + fl_ctx, f"External process has not called flare.init and run status becomes {run_status}." + ) return False time.sleep(0.1) @@ -294,18 +302,17 @@ def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: def _finalize_external_execution( self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal ) -> bool: - with self._lock: - if self._job_end: - ask_peer_end_success = self.ask_peer_to_end(fl_ctx) - if not ask_peer_end_success: - return False + if self._job_end: + ask_peer_end_success = self.ask_peer_to_end(fl_ctx) + if not ask_peer_end_success: + return False check_run_status = self._execute_launcher_method_in_thread_executor( method_name="check_run_status", task_name=task_name, fl_ctx=fl_ctx, ) - if check_run_status != LauncherRunStatus.COMPLETE_SUCCESS: + if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS: self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}") self.log_info(fl_ctx, f"Calling stop task ({task_name}).") @@ -365,9 +372,6 @@ def _monitor_launcher(self, fl_ctx: FLContext): if self.launcher is None: break - if self._current_task is None: - continue - task_name = self._current_task run_status = self._execute_launcher_method_in_thread_executor( method_name="check_run_status", @@ -381,16 +385,19 @@ def _monitor_launcher(self, fl_ctx: FLContext): continue elif run_status == LauncherRunStatus.NOT_RUNNING: + # pause pipe handler because external process is not running self.pause_pipe_handler() continue elif run_status == LauncherRunStatus.RUNNING: + # resume pipe handler when external process is running self.resume_pipe_handler() continue elif ( run_status == LauncherRunStatus.COMPLETE_FAILED or run_status == LauncherRunStatus.COMPLETE_SUCCESS ): + # pause pipe handler because external process is completed self.pause_pipe_handler() if not self._launcher_finish: self._launcher_finish_time = time.time() diff --git a/nvflare/app_common/executors/task_exchanger.py b/nvflare/app_common/executors/task_exchanger.py index e9053629a7..3d4eddf200 100644 --- a/nvflare/app_common/executors/task_exchanger.py +++ b/nvflare/app_common/executors/task_exchanger.py @@ -35,10 +35,10 @@ def __init__( pipe_id: str, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: Optional[float] = 30.0, + heartbeat_timeout: Optional[float] = 60.0, resend_interval: float = 2.0, max_resends: Optional[int] = None, - peer_read_timeout: Optional[float] = 5.0, + peer_read_timeout: Optional[float] = 60.0, task_wait_time: Optional[float] = None, result_poll_interval: float = 0.5, pipe_channel_name=PipeChannelName.TASK, @@ -48,19 +48,16 @@ def __init__( Args: pipe_id (str): component id of pipe. read_interval (float): how often to read from pipe. - Defaults to 0.5. heartbeat_interval (float): how often to send heartbeat to peer. - Defaults to 5.0. heartbeat_timeout (float, optional): how long to wait for a heartbeat from the peer before treating the peer as dead, - 0 means DO NOT check for heartbeat. Defaults to 30.0. + 0 means DO NOT check for heartbeat. resend_interval (float): how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, - then no resend. Defaults to 2.0. + then no resend. max_resends (int, optional): max number of resend. None means no limit. Defaults to None. peer_read_timeout (float, optional): time to wait for peer to accept sent message. - Defaults to 5.0. task_wait_time (float, optional): how long to wait for a task to complete. None means waiting forever. Defaults to None. result_poll_interval (float): how often to poll task result. @@ -145,7 +142,7 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort task_id = shareable.get_header(key=FLContextKey.TASK_ID) # send to peer - self.log_debug(fl_ctx, "sending task to peer ...") + self.log_info(fl_ctx, f"sending task to peer {self.peer_read_timeout=}") req = Message.new_request(topic=task_name, data=shareable, msg_id=task_id) start_time = time.time() has_been_read = self.pipe_handler.send_to_peer(req, timeout=self.peer_read_timeout, abort_signal=abort_signal) @@ -156,6 +153,8 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort ) return make_reply(ReturnCode.EXECUTION_EXCEPTION) + self.log_info(fl_ctx, f"task {task_name} sent to peer in {time.time()-start_time} secs") + # wait for result self.log_debug(fl_ctx, "Waiting for result from peer") start = time.time() @@ -213,6 +212,8 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort if not self.check_output_shareable(task_name, result, fl_ctx): self.log_error(fl_ctx, "bad task result from peer") return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + self.log_info(fl_ctx, f"received result of {task_name} from peer in {time.time()-start} secs") return result except Exception as ex: self.log_error(fl_ctx, f"Failed to convert result: {secure_format_exception(ex)}") diff --git a/nvflare/app_common/widgets/metric_relay.py b/nvflare/app_common/widgets/metric_relay.py index f896fc6a6b..cf321a59d4 100644 --- a/nvflare/app_common/widgets/metric_relay.py +++ b/nvflare/app_common/widgets/metric_relay.py @@ -33,7 +33,6 @@ def __init__( pipe_id: str, read_interval=0.1, heartbeat_interval=5.0, - heartbeat_timeout=30.0, pipe_channel_name=PipeChannelName.METRIC, event_type: str = ANALYTIC_EVENT_TYPE, fed_event: bool = True, @@ -42,7 +41,6 @@ def __init__( self.pipe_id = pipe_id self._read_interval = read_interval self._heartbeat_interval = heartbeat_interval - self._heartbeat_timeout = heartbeat_timeout self.pipe_channel_name = pipe_channel_name self.pipe = None self.pipe_handler = None @@ -64,7 +62,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext): pipe=self.pipe, read_interval=self._read_interval, heartbeat_interval=self._heartbeat_interval, - heartbeat_timeout=self._heartbeat_timeout, + heartbeat_timeout=0, ) self.pipe_handler.set_status_cb(self._pipe_status_cb) self.pipe_handler.set_message_cb(self._pipe_msg_cb) diff --git a/nvflare/fuel/f3/cellnet/cell.py b/nvflare/fuel/f3/cellnet/cell.py index 07f7a72c68..723b894f75 100644 --- a/nvflare/fuel/f3/cellnet/cell.py +++ b/nvflare/fuel/f3/cellnet/cell.py @@ -27,6 +27,7 @@ from nvflare.fuel.f3.streaming.stream_const import StreamHeaderKey from nvflare.fuel.f3.streaming.stream_types import StreamFuture from nvflare.private.defs import CellChannel +from nvflare.security.logging import secure_format_exception CHANNELS_TO_EXCLUDE = ( CellChannel.CLIENT_MAIN, @@ -233,15 +234,21 @@ def _get_result(self, req_id): return waiter.result def _future_wait(self, future, timeout): + # future could have an error! last_progress = 0 while not future.waiter.wait(timeout): + if future.error: + return False current_progress = future.get_progress() if last_progress == current_progress: return False else: self.logger.debug(f"{current_progress=}") last_progress = current_progress - return True + if future.error: + return False + else: + return True def _encode_message(self, msg: Message): try: @@ -250,11 +257,43 @@ def _encode_message(self, msg: Message): self.logger.error(f"Can't encode {msg=} {exc=}") raise exc - def _send_request(self, channel, target, topic, request, timeout=10.0, secure=False, optional=False): + def _send_request( + self, + channel, + target, + topic, + request, + timeout=10.0, + secure=False, + optional=False, + ): + """Stream one request to the target + + Args: + channel: message channel name + target: FQCN of the target cell + topic: topic of the message + request: request message + timeout: how long to wait + secure: is P2P security to be applied + optional: is the message optional + + Returns: reply data + + """ self._encode_message(request) return self._send_one_request(channel, target, topic, request, timeout, secure, optional) - def _send_one_request(self, channel, target, topic, request, timeout=10.0, secure=False, optional=False): + def _send_one_request( + self, + channel, + target, + topic, + request, + timeout=10.0, + secure=False, + optional=False, + ): req_id = str(uuid.uuid4()) request.add_headers({StreamHeaderKey.STREAM_REQ_ID: req_id}) @@ -263,42 +302,46 @@ def _send_one_request(self, channel, target, topic, request, timeout=10.0, secur waiter = SimpleWaiter(req_id=req_id, result=make_reply(ReturnCode.TIMEOUT)) self.requests_dict[req_id] = waiter - future = self.send_blob( - channel=channel, topic=topic, target=target, message=request, secure=secure, optional=optional - ) - - self.logger.debug(f"{req_id=}: Waiting starts") - # Three stages, sending, waiting for receiving first byte, receiving - - # sending with progress timeout - self.logger.debug(f"{req_id=}: entering sending wait {timeout=}") - sending_complete = self._future_wait(future, timeout) - if not sending_complete: - self.logger.info(f"{req_id=}: sending timeout {timeout=}") - return self._get_result(req_id) - self.logger.debug(f"{req_id=}: sending complete") + try: + future = self.send_blob( + channel=channel, topic=topic, target=target, message=request, secure=secure, optional=optional + ) - # waiting for receiving first byte - self.logger.debug(f"{req_id=}: entering remote process wait {timeout=}") - if not waiter.in_receiving.wait(timeout): - self.logger.info(f"{req_id=}: remote processing timeout {timeout=}") + self.logger.debug(f"{req_id=}: Waiting starts") + + # Three stages, sending, waiting for receiving first byte, receiving + # sending with progress timeout + self.logger.debug(f"{req_id=}: entering sending wait {timeout=}") + sending_complete = self._future_wait(future, timeout) + if not sending_complete: + self.logger.debug(f"{req_id=}: sending timeout {timeout=}") + return self._get_result(req_id) + + self.logger.debug(f"{req_id=}: sending complete") + + # waiting for receiving first byte + self.logger.debug(f"{req_id=}: entering remote process wait {timeout=}") + if not waiter.in_receiving.wait(timeout): + self.logger.debug(f"{req_id=}: remote processing timeout {timeout=}") + return self._get_result(req_id) + self.logger.debug(f"{req_id=}: in receiving") + + # receiving with progress timeout + r_future = waiter.receiving_future + self.logger.debug(f"{req_id=}: entering receiving wait {timeout=}") + receiving_complete = self._future_wait(r_future, timeout) + if not receiving_complete: + self.logger.info(f"{req_id=}: receiving timeout {timeout=}") + return self._get_result(req_id) + self.logger.debug(f"{req_id=}: receiving complete") + waiter.result = Message(r_future.headers, r_future.result()) + decode_payload(waiter.result, encoding_key=StreamHeaderKey.PAYLOAD_ENCODING) + self.logger.debug(f"{req_id=}: return result {waiter.result=}") return self._get_result(req_id) - self.logger.debug(f"{req_id=}: in receiving") - - # receiving with progress timeout - r_future = waiter.receiving_future - self.logger.debug(f"{req_id=}: entering receiving wait {timeout=}") - receiving_complete = self._future_wait(r_future, timeout) - if not receiving_complete: - self.logger.info(f"{req_id=}: receiving timeout {timeout=}") + except Exception as ex: + self.logger.error(f"exception sending request: {secure_format_exception(ex)}") return self._get_result(req_id) - self.logger.debug(f"{req_id=}: receiving complete") - waiter.result = Message(r_future.headers, r_future.result()) - decode_payload(waiter.result, encoding_key=StreamHeaderKey.PAYLOAD_ENCODING) - self.logger.debug(f"{req_id=}: return result {waiter.result=}") - result = self._get_result(req_id) - return result def _process_reply(self, future: StreamFuture): headers = future.headers diff --git a/nvflare/fuel/utils/pipe/cell_pipe.py b/nvflare/fuel/utils/pipe/cell_pipe.py index ab95ec437e..56f6716454 100644 --- a/nvflare/fuel/utils/pipe/cell_pipe.py +++ b/nvflare/fuel/utils/pipe/cell_pipe.py @@ -15,6 +15,7 @@ import logging import queue import threading +import time from typing import Tuple, Union from nvflare.fuel.f3.cellnet.cell import Cell @@ -36,6 +37,8 @@ _HEADER_MSG_TYPE = _PREFIX + "msg_type" _HEADER_MSG_ID = _PREFIX + "msg_id" _HEADER_REQ_ID = _PREFIX + "req_id" +_HEADER_START_TIME = _PREFIX + "start" +_HEADER_HB_SEQ = _PREFIX + "hb_seq" def _cell_fqcn(mode, site_name, token): @@ -46,8 +49,10 @@ def _cell_fqcn(mode, site_name, token): return f"{site_name}_{token}_{mode}" -def _to_cell_message(msg: Message) -> CellMessage: - headers = {_HEADER_MSG_TYPE: msg.msg_type, _HEADER_MSG_ID: msg.msg_id} +def _to_cell_message(msg: Message, extra=None) -> CellMessage: + headers = {_HEADER_MSG_TYPE: msg.msg_type, _HEADER_MSG_ID: msg.msg_id, _HEADER_START_TIME: time.time()} + if extra: + headers.update(extra) if msg.req_id: headers[_HEADER_REQ_ID] = msg.req_id @@ -202,12 +207,29 @@ def __init__( self.channel = None # the cellnet message channel self.pipe_lock = threading.Lock() # used to ensure no msg to be sent after closed self.closed = False + self.last_peer_active_time = 0.0 + self.hb_seq = 1 + + def _update_peer_active_time(self, msg: CellMessage, ch_name: str, msg_type: str): + origin = msg.get_header(MessageHeaderKey.ORIGIN) + if origin == self.peer_fqcn: + self.logger.debug(f"{time.time()}: _update_peer_active_time: {ch_name=} {msg_type=} {msg.headers}") + self.last_peer_active_time = time.time() + + def get_last_peer_active_time(self): + return self.last_peer_active_time def set_cell_cb(self, channel_name: str): # This allows multiple pipes over the same cell (e.g. one channel for tasks, another for metrics), # as long as different pipes use different cell message channels self.channel = f"{_PREFIX}{channel_name}" self.cell.register_request_cb(channel=self.channel, topic="*", cb=self._receive_message) + self.cell.core_cell.add_incoming_request_filter( + channel="*", topic="*", cb=self._update_peer_active_time, ch_name=channel_name, msg_type="req" + ) + self.cell.core_cell.add_incoming_reply_filter( + channel="*", topic="*", cb=self._update_peer_active_time, ch_name=channel_name, msg_type="reply" + ) self.logger.info(f"registered CellPipe request CB for {self.channel}") def send(self, msg: Message, timeout=None) -> bool: @@ -225,31 +247,51 @@ def send(self, msg: Message, timeout=None) -> bool: if self.closed: raise BrokenPipeError("pipe closed") - optional = False - if msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]: - optional = True + # Note: the following code must not be within the lock scope + # Otherwise only one message can be sent at a time! + optional = False + if msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]: + optional = True + + if not timeout and msg.topic in [Topic.END, Topic.ABORT]: + timeout = 5.0 # need to keep the connection for some time; otherwise the msg may not go out + + if msg.topic == Topic.HEARTBEAT: + # for debugging purpose + extra_headers = {_HEADER_HB_SEQ: self.hb_seq} + self.hb_seq += 1 - reply = self.cell.send_request( + # don't need to wait for reply! + self.cell.fire_and_forget( channel=self.channel, topic=msg.topic, - target=self.peer_fqcn, - request=_to_cell_message(msg), - timeout=timeout, + targets=[self.peer_fqcn], + message=_to_cell_message(msg, extra_headers), optional=optional, ) - if reply: - rc = reply.get_header(MessageHeaderKey.RETURN_CODE) - if rc == ReturnCode.OK: - return True - else: - err = f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}" - if optional: - self.logger.debug(err) - else: - self.logger.error(err) - return False + return True + + reply = self.cell.send_request( + channel=self.channel, + topic=msg.topic, + target=self.peer_fqcn, + request=_to_cell_message(msg), + timeout=timeout, + optional=optional, + ) + if reply: + rc = reply.get_header(MessageHeaderKey.RETURN_CODE) + if rc == ReturnCode.OK: + return True else: + err = f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}" + if optional: + self.logger.debug(err) + else: + self.logger.error(err) return False + else: + return False def _receive_message(self, request: CellMessage) -> Union[None, CellMessage]: sender = request.get_header(MessageHeaderKey.ORIGIN) diff --git a/nvflare/fuel/utils/pipe/file_pipe.py b/nvflare/fuel/utils/pipe/file_pipe.py index 4b5ceb19bd..da211aa442 100644 --- a/nvflare/fuel/utils/pipe/file_pipe.py +++ b/nvflare/fuel/utils/pipe/file_pipe.py @@ -22,7 +22,7 @@ from nvflare.fuel.utils.pipe.file_accessor import FileAccessor from nvflare.fuel.utils.pipe.file_name_utils import file_name_to_message, message_to_file_name from nvflare.fuel.utils.pipe.fobs_file_accessor import FobsFileAccessor -from nvflare.fuel.utils.pipe.pipe import Message, Pipe +from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number, check_str @@ -260,6 +260,10 @@ def send(self, msg: Message, timeout=None) -> bool: """ if not self.pipe_path: raise BrokenPipeError("pipe is not open") + + if not timeout and msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]: + timeout = 5.0 + return self.put_f(msg, timeout) def receive(self, timeout=None): diff --git a/nvflare/fuel/utils/pipe/pipe.py b/nvflare/fuel/utils/pipe/pipe.py index c4aeb81b3e..b928b01b5b 100644 --- a/nvflare/fuel/utils/pipe/pipe.py +++ b/nvflare/fuel/utils/pipe/pipe.py @@ -140,6 +140,14 @@ def can_resend(self) -> bool: """Whether the pipe is able to resend a message.""" pass + def get_last_peer_active_time(self): + """Get the last time that the peer is known to be active + + Returns: the last time that the peer is known to be active; or 0 if this info is not available + + """ + return 0 + def export(self, export_mode: str) -> Tuple[str, dict]: if export_mode == ExportMode.SELF: mode = self.mode diff --git a/nvflare/fuel/utils/pipe/pipe_handler.py b/nvflare/fuel/utils/pipe/pipe_handler.py index 4826c0bfa4..efbbcee134 100644 --- a/nvflare/fuel/utils/pipe/pipe_handler.py +++ b/nvflare/fuel/utils/pipe/pipe_handler.py @@ -61,7 +61,7 @@ def __init__( heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, - max_resends=None, + max_resends=5, default_request_timeout=5.0, ): """Constructor of the PipeHandler. @@ -166,6 +166,7 @@ def set_message_cb(self, cb, *args, **kwargs): def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None): pipe = self.pipe if not pipe: + self.logger.error("cannot send message to pipe since it's already closed") return False if not timeout or not pipe.can_resend() or not self.resend_interval: @@ -181,6 +182,7 @@ def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None) return sent if self.max_resends is not None and num_sends > self.max_resends: + self.logger.error(f"abort sending after {num_sends} tries") return False if self.asked_to_stop: @@ -208,10 +210,10 @@ def start(self): """Starts the PipeHandler. Note: before calling this method, the pipe managed by this PipeHandler must have been opened. """ - if not self.reader.is_alive(): + if self.reader and not self.reader.is_alive(): self.reader.start() - if not self.heartbeat_sender.is_alive(): + if self.heartbeat_sender and not self.heartbeat_sender.is_alive(): self.heartbeat_sender.start() def stop(self, close_pipe=True): @@ -256,7 +258,8 @@ def notify_end(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.END, data)) + # fire and forget + p.send(self._make_event_message(Topic.END, data), 0.1) except Exception as ex: self.logger.debug(f"exception notify_end: {secure_format_exception(ex)}") @@ -265,7 +268,8 @@ def notify_abort(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.ABORT, data)) + # fire and forget + p.send(self._make_event_message(Topic.ABORT, data), 0.1) except Exception as ex: self.logger.debug(f"exception notify_abort: {secure_format_exception(ex)}") @@ -310,6 +314,11 @@ def _try_read(self): break else: # is peer gone? + # ask the pipe for the last known active time of the peer + last_peer_active_time = self.pipe.get_last_peer_active_time() + if last_peer_active_time > self._last_heartbeat_received_time: + self._last_heartbeat_received_time = last_peer_active_time + if ( self.heartbeat_timeout and now - self._last_heartbeat_received_time > self.heartbeat_timeout @@ -339,6 +348,7 @@ def _heartbeat(self): last_heartbeat_sent_time = now time.sleep(self._check_interval) + self.heartbeat_sender = None def get_next(self) -> Optional[Message]: """Gets the next message from the message queue.