Skip to content

Commit

Permalink
add wait_function
Browse files Browse the repository at this point in the history
  • Loading branch information
panh99 committed Mar 31, 2024
1 parent 6226bb0 commit 2232f50
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def _load_client_app() -> ClientApp:
)

retry_invoker = RetryInvoker(
wait_factory=exponential,
wait_time_gen_factory=exponential,
recoverable_exceptions=connection_error_type,
max_tries=max_retries,
max_time=max_wait_time,
Expand Down
2 changes: 1 addition & 1 deletion src/py/flwr/client/grpc_client/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def run_client() -> int:
server_address=f"[::]:{port}",
insecure=True,
retry_invoker=RetryInvoker(
wait_factory=exponential,
wait_time_gen_factory=exponential,
recoverable_exceptions=grpc.RpcError,
max_tries=1,
max_time=None,
Expand Down
17 changes: 13 additions & 4 deletions src/py/flwr/common/retry_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ class RetryInvoker:
A function accepting an exception instance, returning whether or not
to give up prematurely before other give-up conditions are evaluated.
If set to None, the strategy is to never give up prematurely.
wait_function: Optional[Callable[[float], None]] (default: None)
A function that defines how to wait between retry attempts. It accepts
one argument, the wait time in seconds, allowing the use of various waiting
mechanisms (e.g., asynchronous waits or event-based synchronization) suitable
for different execution environments. If set to `None`, the `wait_function`
defaults to `time.sleep`, which is ideal for synchronous operations. Custom
functions should manage execution flow to prevent blocking or interference.
Examples
--------
Expand All @@ -159,7 +166,7 @@ class RetryInvoker:
# pylint: disable-next=too-many-arguments
def __init__(
self,
wait_factory: Callable[[], Generator[float, None, None]],
wait_time_gen_factory: Callable[[], Generator[float, None, None]],
recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]],
max_tries: Optional[int],
max_time: Optional[float],
Expand All @@ -169,8 +176,9 @@ def __init__(
on_giveup: Optional[Callable[[RetryState], None]] = None,
jitter: Optional[Callable[[float], float]] = full_jitter,
should_giveup: Optional[Callable[[Exception], bool]] = None,
wait_function: Optional[Callable[[float], None]] = None,
) -> None:
self.wait_factory = wait_factory
self.wait_time_gen_factory = wait_time_gen_factory
self.recoverable_exceptions = recoverable_exceptions
self.max_tries = max_tries
self.max_time = max_time
Expand All @@ -179,6 +187,7 @@ def __init__(
self.on_giveup = on_giveup
self.jitter = jitter
self.should_giveup = should_giveup
self.wait_function = wait_function

# pylint: disable-next=too-many-locals
def invoke(
Expand Down Expand Up @@ -231,7 +240,7 @@ def try_call_event_handler(
handler(cast(RetryState, ref_state[0]))

try_cnt = 0
wait_generator = self.wait_factory()
wait_generator = self.wait_time_gen_factory()
start = time.time()
ref_state: List[Optional[RetryState]] = [None]

Expand Down Expand Up @@ -282,7 +291,7 @@ def giveup_check(_exception: Exception) -> bool:
try_call_event_handler(self.on_backoff)

# Sleep
time.sleep(state.actual_wait)
self.wait_function(state.actual_wait)
else:
# Trigger success event
try_call_event_handler(self.on_success)
Expand Down

0 comments on commit 2232f50

Please sign in to comment.