From 2232f50dc00e0a395d4b33bb79b10fc8bbd310a9 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Sun, 31 Mar 2024 13:19:01 +0100 Subject: [PATCH] add wait_function --- src/py/flwr/client/app.py | 2 +- .../flwr/client/grpc_client/connection_test.py | 2 +- src/py/flwr/common/retry_invoker.py | 17 +++++++++++++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index d4bd8e2e39e9..1100463e68d8 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -397,7 +397,7 @@ def _load_client_app() -> ClientApp: ) retry_invoker = RetryInvoker( - wait_factory=exponential, + wait_time_gen_factory=exponential, recoverable_exceptions=connection_error_type, max_tries=max_retries, max_time=max_wait_time, diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 061e7d4377a0..6675a11d8ef5 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -132,7 +132,7 @@ def run_client() -> int: server_address=f"[::]:{port}", insecure=True, retry_invoker=RetryInvoker( - wait_factory=exponential, + wait_time_gen_factory=exponential, recoverable_exceptions=grpc.RpcError, max_tries=1, max_time=None, diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 65962dd60154..6d65577f54bb 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -142,6 +142,13 @@ class RetryInvoker: A function accepting an exception instance, returning whether or not to give up prematurely before other give-up conditions are evaluated. If set to None, the strategy is to never give up prematurely. + wait_function: Optional[Callable[[float], None]] (default: None) + A function that defines how to wait between retry attempts. It accepts + one argument, the wait time in seconds, allowing the use of various waiting + mechanisms (e.g., asynchronous waits or event-based synchronization) suitable + for different execution environments. If set to `None`, the `wait_function` + defaults to `time.sleep`, which is ideal for synchronous operations. Custom + functions should manage execution flow to prevent blocking or interference. Examples -------- @@ -159,7 +166,7 @@ class RetryInvoker: # pylint: disable-next=too-many-arguments def __init__( self, - wait_factory: Callable[[], Generator[float, None, None]], + wait_time_gen_factory: Callable[[], Generator[float, None, None]], recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], @@ -169,8 +176,9 @@ def __init__( on_giveup: Optional[Callable[[RetryState], None]] = None, jitter: Optional[Callable[[float], float]] = full_jitter, should_giveup: Optional[Callable[[Exception], bool]] = None, + wait_function: Optional[Callable[[float], None]] = None, ) -> None: - self.wait_factory = wait_factory + self.wait_time_gen_factory = wait_time_gen_factory self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time @@ -179,6 +187,7 @@ def __init__( self.on_giveup = on_giveup self.jitter = jitter self.should_giveup = should_giveup + self.wait_function = wait_function # pylint: disable-next=too-many-locals def invoke( @@ -231,7 +240,7 @@ def try_call_event_handler( handler(cast(RetryState, ref_state[0])) try_cnt = 0 - wait_generator = self.wait_factory() + wait_generator = self.wait_time_gen_factory() start = time.time() ref_state: List[Optional[RetryState]] = [None] @@ -282,7 +291,7 @@ def giveup_check(_exception: Exception) -> bool: try_call_event_handler(self.on_backoff) # Sleep - time.sleep(state.actual_wait) + self.wait_function(state.actual_wait) else: # Trigger success event try_call_event_handler(self.on_success)