From 6226bb0214294602d2b89e6c03cd74c5a3d78cd2 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Sat, 30 Mar 2024 11:30:26 +0000 Subject: [PATCH 1/4] allow changing actual_wait in event handler --- src/py/flwr/common/retry_invoker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 5441e766983a..65962dd60154 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -282,7 +282,7 @@ def giveup_check(_exception: Exception) -> bool: try_call_event_handler(self.on_backoff) # Sleep - time.sleep(wait_time) + time.sleep(state.actual_wait) else: # Trigger success event try_call_event_handler(self.on_success) From 2232f50dc00e0a395d4b33bb79b10fc8bbd310a9 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Sun, 31 Mar 2024 13:19:01 +0100 Subject: [PATCH 2/4] add wait_function --- src/py/flwr/client/app.py | 2 +- .../flwr/client/grpc_client/connection_test.py | 2 +- src/py/flwr/common/retry_invoker.py | 17 +++++++++++++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index d4bd8e2e39e9..1100463e68d8 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -397,7 +397,7 @@ def _load_client_app() -> ClientApp: ) retry_invoker = RetryInvoker( - wait_factory=exponential, + wait_time_gen_factory=exponential, recoverable_exceptions=connection_error_type, max_tries=max_retries, max_time=max_wait_time, diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 061e7d4377a0..6675a11d8ef5 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -132,7 +132,7 @@ def run_client() -> int: server_address=f"[::]:{port}", insecure=True, retry_invoker=RetryInvoker( - wait_factory=exponential, + wait_time_gen_factory=exponential, recoverable_exceptions=grpc.RpcError, max_tries=1, max_time=None, diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 65962dd60154..6d65577f54bb 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -142,6 +142,13 @@ class RetryInvoker: A function accepting an exception instance, returning whether or not to give up prematurely before other give-up conditions are evaluated. If set to None, the strategy is to never give up prematurely. + wait_function: Optional[Callable[[float], None]] (default: None) + A function that defines how to wait between retry attempts. It accepts + one argument, the wait time in seconds, allowing the use of various waiting + mechanisms (e.g., asynchronous waits or event-based synchronization) suitable + for different execution environments. If set to `None`, the `wait_function` + defaults to `time.sleep`, which is ideal for synchronous operations. Custom + functions should manage execution flow to prevent blocking or interference. Examples -------- @@ -159,7 +166,7 @@ class RetryInvoker: # pylint: disable-next=too-many-arguments def __init__( self, - wait_factory: Callable[[], Generator[float, None, None]], + wait_time_gen_factory: Callable[[], Generator[float, None, None]], recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], @@ -169,8 +176,9 @@ def __init__( on_giveup: Optional[Callable[[RetryState], None]] = None, jitter: Optional[Callable[[float], float]] = full_jitter, should_giveup: Optional[Callable[[Exception], bool]] = None, + wait_function: Optional[Callable[[float], None]] = None, ) -> None: - self.wait_factory = wait_factory + self.wait_time_gen_factory = wait_time_gen_factory self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time @@ -179,6 +187,7 @@ def __init__( self.on_giveup = on_giveup self.jitter = jitter self.should_giveup = should_giveup + self.wait_function = wait_function # pylint: disable-next=too-many-locals def invoke( @@ -231,7 +240,7 @@ def try_call_event_handler( handler(cast(RetryState, ref_state[0])) try_cnt = 0 - wait_generator = self.wait_factory() + wait_generator = self.wait_time_gen_factory() start = time.time() ref_state: List[Optional[RetryState]] = [None] @@ -282,7 +291,7 @@ def giveup_check(_exception: Exception) -> bool: try_call_event_handler(self.on_backoff) # Sleep - time.sleep(state.actual_wait) + self.wait_function(state.actual_wait) else: # Trigger success event try_call_event_handler(self.on_success) From 8d8cbb5c0db6e2ff3cb869ac4ff918e90a090e83 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Sun, 31 Mar 2024 13:22:42 +0100 Subject: [PATCH 3/4] rename wait_time_gen_factory to wait_gen_factory --- src/py/flwr/client/app.py | 2 +- .../client/grpc_client/connection_test.py | 2 +- src/py/flwr/common/retry_invoker.py | 20 +++++++++---------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 1100463e68d8..644d37060d53 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -397,7 +397,7 @@ def _load_client_app() -> ClientApp: ) retry_invoker = RetryInvoker( - wait_time_gen_factory=exponential, + wait_gen_factory=exponential, recoverable_exceptions=connection_error_type, max_tries=max_retries, max_time=max_wait_time, diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 6675a11d8ef5..ed622f55ff1e 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -132,7 +132,7 @@ def run_client() -> int: server_address=f"[::]:{port}", insecure=True, retry_invoker=RetryInvoker( - wait_time_gen_factory=exponential, + wait_gen_factory=exponential, recoverable_exceptions=grpc.RpcError, max_tries=1, max_time=None, diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 6d65577f54bb..ffc46f34d144 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -107,7 +107,7 @@ class RetryInvoker: Parameters ---------- - wait_factory: Callable[[], Generator[float, None, None]] + wait_gen_factory: Callable[[], Generator[float, None, None]] A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. @@ -129,12 +129,12 @@ class RetryInvoker: data class object detailing the invocation. on_giveup: Optional[Callable[[RetryState], None]] (default: None) A callable to be executed in the event that `max_tries` or `max_time` is - exceeded, `should_giveup` returns True, or `wait_factory()` generator raises + exceeded, `should_giveup` returns True, or `wait_gen_factory()` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_factory()` returning the actual time - to wait. This function helps distribute wait times stochastically to avoid + A function of the value yielded by `wait_gen_factory()` returning the actual + time to wait. This function helps distribute wait times stochastically to avoid timing collisions across concurrent clients. Wait times are jittered by default using the `full_jitter` function. To disable jittering, pass `jitter=None`. @@ -166,7 +166,7 @@ class RetryInvoker: # pylint: disable-next=too-many-arguments def __init__( self, - wait_time_gen_factory: Callable[[], Generator[float, None, None]], + wait_gen_factory: Callable[[], Generator[float, None, None]], recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], @@ -178,7 +178,7 @@ def __init__( should_giveup: Optional[Callable[[Exception], bool]] = None, wait_function: Optional[Callable[[float], None]] = None, ) -> None: - self.wait_time_gen_factory = wait_time_gen_factory + self.wait_gen_factory = wait_gen_factory self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time @@ -221,13 +221,13 @@ def invoke( Raises ------ Exception - If the number of tries exceeds `max_tries`, if the total time - exceeds `max_time`, if `wait_factory()` generator raises `StopInteration`, + If the number of tries exceeds `max_tries`, if the total time exceeds + `max_time`, if `wait_gen_factory()` generator raises `StopInteration`, or if the `should_giveup` returns True for a raised exception. Notes ----- - The time between retries is determined by the provided `wait_factory()` + The time between retries is determined by the provided `wait_gen_factory()` generator and can optionally be jittered using the `jitter` function. The recoverable exceptions that trigger a retry, as well as conditions to stop retries, are also determined by the class's initialization parameters. @@ -240,7 +240,7 @@ def try_call_event_handler( handler(cast(RetryState, ref_state[0])) try_cnt = 0 - wait_generator = self.wait_time_gen_factory() + wait_generator = self.wait_gen_factory() start = time.time() ref_state: List[Optional[RetryState]] = [None] From 806a77588cce7a16637b0b527382915b9698c1ef Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Sun, 31 Mar 2024 13:30:05 +0100 Subject: [PATCH 4/4] use time.sleep as default --- src/py/flwr/common/retry_invoker.py | 6 ++++-- src/py/flwr/common/retry_invoker_test.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index ffc46f34d144..7cec319e7906 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -187,6 +187,8 @@ def __init__( self.on_giveup = on_giveup self.jitter = jitter self.should_giveup = should_giveup + if wait_function is None: + wait_function = time.sleep self.wait_function = wait_function # pylint: disable-next=too-many-locals @@ -241,12 +243,12 @@ def try_call_event_handler( try_cnt = 0 wait_generator = self.wait_gen_factory() - start = time.time() + start = time.monotonic() ref_state: List[Optional[RetryState]] = [None] while True: try_cnt += 1 - elapsed_time = time.time() - start + elapsed_time = time.monotonic() - start state = RetryState( target=target, args=args, diff --git a/src/py/flwr/common/retry_invoker_test.py b/src/py/flwr/common/retry_invoker_test.py index e67c0641e2ba..2259ae47ded4 100644 --- a/src/py/flwr/common/retry_invoker_test.py +++ b/src/py/flwr/common/retry_invoker_test.py @@ -35,8 +35,8 @@ def failing_function() -> None: @pytest.fixture(name="mock_time") def fixture_mock_time() -> Generator[MagicMock, None, None]: - """Mock time.time for controlled testing.""" - with patch("time.time") as mock_time: + """Mock time.monotonic for controlled testing.""" + with patch("time.monotonic") as mock_time: yield mock_time