From 7ab8df048bc5df1ff6229b8e9611889b8dd851ab Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Mon, 1 Apr 2024 13:34:04 +0100 Subject: [PATCH 1/7] Support custom wait function in `RetryInvoker` (#3183) --- src/py/flwr/client/app.py | 2 +- .../client/grpc_client/connection_test.py | 2 +- src/py/flwr/common/retry_invoker.py | 37 ++++++++++++------- src/py/flwr/common/retry_invoker_test.py | 4 +- 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index d4bd8e2e39e9..644d37060d53 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -397,7 +397,7 @@ def _load_client_app() -> ClientApp: ) retry_invoker = RetryInvoker( - wait_factory=exponential, + wait_gen_factory=exponential, recoverable_exceptions=connection_error_type, max_tries=max_retries, max_time=max_wait_time, diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 061e7d4377a0..ed622f55ff1e 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -132,7 +132,7 @@ def run_client() -> int: server_address=f"[::]:{port}", insecure=True, retry_invoker=RetryInvoker( - wait_factory=exponential, + wait_gen_factory=exponential, recoverable_exceptions=grpc.RpcError, max_tries=1, max_time=None, diff --git a/src/py/flwr/common/retry_invoker.py b/src/py/flwr/common/retry_invoker.py index 5441e766983a..7cec319e7906 100644 --- a/src/py/flwr/common/retry_invoker.py +++ b/src/py/flwr/common/retry_invoker.py @@ -107,7 +107,7 @@ class RetryInvoker: Parameters ---------- - wait_factory: Callable[[], Generator[float, None, None]] + wait_gen_factory: Callable[[], Generator[float, None, None]] A generator yielding successive wait times in seconds. If the generator is finite, the giveup event will be triggered when the generator raises `StopIteration`. @@ -129,12 +129,12 @@ class RetryInvoker: data class object detailing the invocation. on_giveup: Optional[Callable[[RetryState], None]] (default: None) A callable to be executed in the event that `max_tries` or `max_time` is - exceeded, `should_giveup` returns True, or `wait_factory()` generator raises + exceeded, `should_giveup` returns True, or `wait_gen_factory()` generator raises `StopInteration`. The parameter is a data class object detailing the invocation. jitter: Optional[Callable[[float], float]] (default: full_jitter) - A function of the value yielded by `wait_factory()` returning the actual time - to wait. This function helps distribute wait times stochastically to avoid + A function of the value yielded by `wait_gen_factory()` returning the actual + time to wait. This function helps distribute wait times stochastically to avoid timing collisions across concurrent clients. Wait times are jittered by default using the `full_jitter` function. To disable jittering, pass `jitter=None`. @@ -142,6 +142,13 @@ class RetryInvoker: A function accepting an exception instance, returning whether or not to give up prematurely before other give-up conditions are evaluated. If set to None, the strategy is to never give up prematurely. + wait_function: Optional[Callable[[float], None]] (default: None) + A function that defines how to wait between retry attempts. It accepts + one argument, the wait time in seconds, allowing the use of various waiting + mechanisms (e.g., asynchronous waits or event-based synchronization) suitable + for different execution environments. If set to `None`, the `wait_function` + defaults to `time.sleep`, which is ideal for synchronous operations. Custom + functions should manage execution flow to prevent blocking or interference. Examples -------- @@ -159,7 +166,7 @@ class RetryInvoker: # pylint: disable-next=too-many-arguments def __init__( self, - wait_factory: Callable[[], Generator[float, None, None]], + wait_gen_factory: Callable[[], Generator[float, None, None]], recoverable_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]], max_tries: Optional[int], max_time: Optional[float], @@ -169,8 +176,9 @@ def __init__( on_giveup: Optional[Callable[[RetryState], None]] = None, jitter: Optional[Callable[[float], float]] = full_jitter, should_giveup: Optional[Callable[[Exception], bool]] = None, + wait_function: Optional[Callable[[float], None]] = None, ) -> None: - self.wait_factory = wait_factory + self.wait_gen_factory = wait_gen_factory self.recoverable_exceptions = recoverable_exceptions self.max_tries = max_tries self.max_time = max_time @@ -179,6 +187,9 @@ def __init__( self.on_giveup = on_giveup self.jitter = jitter self.should_giveup = should_giveup + if wait_function is None: + wait_function = time.sleep + self.wait_function = wait_function # pylint: disable-next=too-many-locals def invoke( @@ -212,13 +223,13 @@ def invoke( Raises ------ Exception - If the number of tries exceeds `max_tries`, if the total time - exceeds `max_time`, if `wait_factory()` generator raises `StopInteration`, + If the number of tries exceeds `max_tries`, if the total time exceeds + `max_time`, if `wait_gen_factory()` generator raises `StopInteration`, or if the `should_giveup` returns True for a raised exception. Notes ----- - The time between retries is determined by the provided `wait_factory()` + The time between retries is determined by the provided `wait_gen_factory()` generator and can optionally be jittered using the `jitter` function. The recoverable exceptions that trigger a retry, as well as conditions to stop retries, are also determined by the class's initialization parameters. @@ -231,13 +242,13 @@ def try_call_event_handler( handler(cast(RetryState, ref_state[0])) try_cnt = 0 - wait_generator = self.wait_factory() - start = time.time() + wait_generator = self.wait_gen_factory() + start = time.monotonic() ref_state: List[Optional[RetryState]] = [None] while True: try_cnt += 1 - elapsed_time = time.time() - start + elapsed_time = time.monotonic() - start state = RetryState( target=target, args=args, @@ -282,7 +293,7 @@ def giveup_check(_exception: Exception) -> bool: try_call_event_handler(self.on_backoff) # Sleep - time.sleep(wait_time) + self.wait_function(state.actual_wait) else: # Trigger success event try_call_event_handler(self.on_success) diff --git a/src/py/flwr/common/retry_invoker_test.py b/src/py/flwr/common/retry_invoker_test.py index e67c0641e2ba..2259ae47ded4 100644 --- a/src/py/flwr/common/retry_invoker_test.py +++ b/src/py/flwr/common/retry_invoker_test.py @@ -35,8 +35,8 @@ def failing_function() -> None: @pytest.fixture(name="mock_time") def fixture_mock_time() -> Generator[MagicMock, None, None]: - """Mock time.time for controlled testing.""" - with patch("time.time") as mock_time: + """Mock time.monotonic for controlled testing.""" + with patch("time.monotonic") as mock_time: yield mock_time From 930c88654e987fc730353a3cbc0403b66e974ab6 Mon Sep 17 00:00:00 2001 From: Javier Date: Mon, 1 Apr 2024 15:17:18 +0100 Subject: [PATCH 2/7] Set deprecated baselines to use pillow 10.2.0 (#3186) --- baselines/flwr_baselines/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/baselines/flwr_baselines/pyproject.toml b/baselines/flwr_baselines/pyproject.toml index f0b2ac84e66e..add99938d2a3 100644 --- a/baselines/flwr_baselines/pyproject.toml +++ b/baselines/flwr_baselines/pyproject.toml @@ -51,6 +51,7 @@ wget = "^3.2" virtualenv = "^20.24.6" pandas = "^1.5.3" pyhamcrest = "^2.0.4" +pillow = "==10.2.0" [tool.poetry.dev-dependencies] isort = "==5.13.2" From 41b491b59713c1524cbd6da8ce73b6910b036d3f Mon Sep 17 00:00:00 2001 From: Javier Date: Mon, 1 Apr 2024 15:29:22 +0100 Subject: [PATCH 3/7] Add delay to Simulation Engine termination (#3184) --- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index 9736ae0fb57f..5fec10940343 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -223,6 +223,7 @@ async def run( # pylint: disable=too-many-arguments,unused-argument,too-many-locals,too-many-branches +# pylint: disable=too-many-statements def start_vce( backend_name: str, backend_config_json_stream: str, @@ -341,6 +342,13 @@ def _load() -> ClientApp: ) ) except LoadClientAppError as loadapp_ex: + f_stop_delay = 10 + log( + ERROR, + "LoadClientAppError exception encountered. Terminating simulation in %is", + f_stop_delay, + ) + time.sleep(f_stop_delay) f_stop.set() # set termination event raise loadapp_ex except Exception as ex: From 9842e41615a6ae8fb47c3aac78473bf31d8cb368 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Mon, 1 Apr 2024 18:03:22 +0100 Subject: [PATCH 4/7] Send ping from SuperNode (#3181) --- .../client/grpc_rere_client/connection.py | 94 +++++++++---- src/py/flwr/client/heartbeat.py | 72 ++++++++++ src/py/flwr/client/heartbeat_test.py | 59 ++++++++ src/py/flwr/client/rest_client/connection.py | 128 ++++++++++++++---- src/py/flwr/common/constant.py | 6 + .../fleet/message_handler/message_handler.py | 3 +- .../superlink/fleet/rest_rere/rest_api.py | 28 ++++ 7 files changed, 337 insertions(+), 53 deletions(-) create mode 100644 src/py/flwr/client/heartbeat.py create mode 100644 src/py/flwr/client/heartbeat_test.py diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index e6e22998b947..06573ffaafb7 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -15,15 +15,24 @@ """Contextmanager for a gRPC request-response channel to the Flower server.""" +import random +import threading from contextlib import contextmanager from copy import copy from logging import DEBUG, ERROR from pathlib import Path -from typing import Callable, Dict, Iterator, Optional, Tuple, Union, cast +from typing import Callable, Iterator, Optional, Tuple, Union, cast +from flwr.client.heartbeat import start_ping_loop from flwr.client.message_handler.message_handler import validate_out_message from flwr.client.message_handler.task_handler import get_task_ins, validate_task_ins from flwr.common import GRPC_MAX_MESSAGE_LENGTH +from flwr.common.constant import ( + PING_BASE_MULTIPLIER, + PING_CALL_TIMEOUT, + PING_DEFAULT_INTERVAL, + PING_RANDOM_RANGE, +) from flwr.common.grpc import create_channel from flwr.common.logger import log, warn_experimental_feature from flwr.common.message import Message, Metadata @@ -32,6 +41,8 @@ from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, DeleteNodeRequest, + PingRequest, + PingResponse, PullTaskInsRequest, PushTaskResRequest, ) @@ -39,9 +50,6 @@ from flwr.proto.node_pb2 import Node # pylint: disable=E0611 from flwr.proto.task_pb2 import TaskIns # pylint: disable=E0611 -KEY_NODE = "node" -KEY_METADATA = "in_message_metadata" - def on_channel_state_change(channel_connectivity: str) -> None: """Log channel connectivity.""" @@ -49,7 +57,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: @contextmanager -def grpc_request_response( +def grpc_request_response( # pylint: disable=R0914, R0915 server_address: str, insecure: bool, retry_invoker: RetryInvoker, @@ -107,47 +115,81 @@ def grpc_request_response( max_message_length=max_message_length, ) channel.subscribe(on_channel_state_change) - stub = FleetStub(channel) - - # Necessary state to validate messages to be sent - state: Dict[str, Optional[Metadata]] = {KEY_METADATA: None} - # Enable create_node and delete_node to store node - node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} + # Shared variables for inner functions + stub = FleetStub(channel) + metadata: Optional[Metadata] = None + node: Optional[Node] = None + ping_thread: Optional[threading.Thread] = None + ping_stop_event = threading.Event() ########################################################################### - # receive/send functions + # ping/create_node/delete_node/receive/send functions ########################################################################### + def ping() -> None: + # Get Node + if node is None: + log(ERROR, "Node instance missing") + return + + # Construct the ping request + req = PingRequest(node=node, ping_interval=PING_DEFAULT_INTERVAL) + + # Call FleetAPI + res: PingResponse = stub.Ping(req, timeout=PING_CALL_TIMEOUT) + + # Check if success + if not res.success: + raise RuntimeError("Ping failed unexpectedly.") + + # Wait + rd = random.uniform(*PING_RANDOM_RANGE) + next_interval: float = PING_DEFAULT_INTERVAL - PING_CALL_TIMEOUT + next_interval *= PING_BASE_MULTIPLIER + rd + if not ping_stop_event.is_set(): + ping_stop_event.wait(next_interval) + def create_node() -> None: """Set create_node.""" + # Call FleetAPI create_node_request = CreateNodeRequest() create_node_response = retry_invoker.invoke( stub.CreateNode, request=create_node_request, ) - node_store[KEY_NODE] = create_node_response.node + + # Remember the node and the ping-loop thread + nonlocal node, ping_thread + node = cast(Node, create_node_response.node) + ping_thread = start_ping_loop(ping, ping_stop_event) def delete_node() -> None: """Set delete_node.""" # Get Node - if node_store[KEY_NODE] is None: + nonlocal node + if node is None: log(ERROR, "Node instance missing") return - node: Node = cast(Node, node_store[KEY_NODE]) + # Stop the ping-loop thread + ping_stop_event.set() + if ping_thread is not None: + ping_thread.join() + + # Call FleetAPI delete_node_request = DeleteNodeRequest(node=node) retry_invoker.invoke(stub.DeleteNode, request=delete_node_request) - del node_store[KEY_NODE] + # Cleanup + node = None def receive() -> Optional[Message]: """Receive next task from server.""" # Get Node - if node_store[KEY_NODE] is None: + if node is None: log(ERROR, "Node instance missing") return None - node: Node = cast(Node, node_store[KEY_NODE]) # Request instructions (task) from server request = PullTaskInsRequest(node=node) @@ -167,7 +209,8 @@ def receive() -> Optional[Message]: in_message = message_from_taskins(task_ins) if task_ins else None # Remember `metadata` of the in message - state[KEY_METADATA] = copy(in_message.metadata) if in_message else None + nonlocal metadata + metadata = copy(in_message.metadata) if in_message else None # Return the message if available return in_message @@ -175,18 +218,18 @@ def receive() -> Optional[Message]: def send(message: Message) -> None: """Send task result back to server.""" # Get Node - if node_store[KEY_NODE] is None: + if node is None: log(ERROR, "Node instance missing") return - # Get incoming message - in_metadata = state[KEY_METADATA] - if in_metadata is None: + # Get the metadata of the incoming message + nonlocal metadata + if metadata is None: log(ERROR, "No current message") return # Validate out message - if not validate_out_message(message, in_metadata): + if not validate_out_message(message, metadata): log(ERROR, "Invalid out message") return @@ -197,7 +240,8 @@ def send(message: Message) -> None: request = PushTaskResRequest(task_res_list=[task_res]) _ = retry_invoker.invoke(stub.PushTaskRes, request) - state[KEY_METADATA] = None + # Cleanup + metadata = None try: # Yield methods diff --git a/src/py/flwr/client/heartbeat.py b/src/py/flwr/client/heartbeat.py new file mode 100644 index 000000000000..0cc979ddfd13 --- /dev/null +++ b/src/py/flwr/client/heartbeat.py @@ -0,0 +1,72 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Heartbeat utility functions.""" + + +import threading +from typing import Callable + +import grpc + +from flwr.common.constant import PING_CALL_TIMEOUT +from flwr.common.retry_invoker import RetryInvoker, RetryState, exponential + + +def _ping_loop(ping_fn: Callable[[], None], stop_event: threading.Event) -> None: + def wait_fn(wait_time: float) -> None: + if not stop_event.is_set(): + stop_event.wait(wait_time) + + def on_backoff(state: RetryState) -> None: + err = state.exception + if not isinstance(err, grpc.RpcError): + return + status_code = err.code() + # If ping call timeout is triggered + if status_code == grpc.StatusCode.DEADLINE_EXCEEDED: + # Avoid long wait time. + if state.actual_wait is None: + return + state.actual_wait = max(state.actual_wait - PING_CALL_TIMEOUT, 0.0) + + def wrapped_ping() -> None: + if not stop_event.is_set(): + ping_fn() + + retrier = RetryInvoker( + exponential, + grpc.RpcError, + max_tries=None, + max_time=None, + on_backoff=on_backoff, + wait_function=wait_fn, + ) + while not stop_event.is_set(): + retrier.invoke(wrapped_ping) + + +def start_ping_loop( + ping_fn: Callable[[], None], stop_event: threading.Event +) -> threading.Thread: + """Start a ping loop in a separate thread. + + This function initializes a new thread that runs a ping loop, allowing for + asynchronous ping operations. The loop can be terminated through the provided stop + event. + """ + thread = threading.Thread(target=_ping_loop, args=(ping_fn, stop_event)) + thread.start() + + return thread diff --git a/src/py/flwr/client/heartbeat_test.py b/src/py/flwr/client/heartbeat_test.py new file mode 100644 index 000000000000..286429e075b1 --- /dev/null +++ b/src/py/flwr/client/heartbeat_test.py @@ -0,0 +1,59 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Unit tests for heartbeat utility functions.""" + + +import threading +import time +import unittest +from unittest.mock import MagicMock + +from .heartbeat import start_ping_loop + + +class TestStartPingLoopWithFailures(unittest.TestCase): + """Test heartbeat utility functions.""" + + def test_ping_loop_terminates(self) -> None: + """Test if the ping loop thread terminates when flagged.""" + # Prepare + ping_fn = MagicMock() + stop_event = threading.Event() + + # Execute + thread = start_ping_loop(ping_fn, stop_event) + time.sleep(1) + stop_event.set() + thread.join(timeout=1) + + # Assert + self.assertTrue(ping_fn.called) + self.assertFalse(thread.is_alive()) + + def test_ping_loop_with_failures_terminates(self) -> None: + """Test if the ping loop thread with failures terminates when flagged.""" + # Prepare + ping_fn = MagicMock(side_effect=RuntimeError()) + stop_event = threading.Event() + + # Execute + thread = start_ping_loop(ping_fn, stop_event) + time.sleep(1) + stop_event.set() + thread.join(timeout=1) + + # Assert + self.assertTrue(ping_fn.called) + self.assertFalse(thread.is_alive()) diff --git a/src/py/flwr/client/rest_client/connection.py b/src/py/flwr/client/rest_client/connection.py index d2cc71ba3b3f..514635103f01 100644 --- a/src/py/flwr/client/rest_client/connection.py +++ b/src/py/flwr/client/rest_client/connection.py @@ -15,16 +15,25 @@ """Contextmanager for a REST request-response channel to the Flower server.""" +import random import sys +import threading from contextlib import contextmanager from copy import copy from logging import ERROR, INFO, WARN -from typing import Callable, Dict, Iterator, Optional, Tuple, Union, cast +from typing import Callable, Iterator, Optional, Tuple, Union +from flwr.client.heartbeat import start_ping_loop from flwr.client.message_handler.message_handler import validate_out_message from flwr.client.message_handler.task_handler import get_task_ins, validate_task_ins from flwr.common import GRPC_MAX_MESSAGE_LENGTH -from flwr.common.constant import MISSING_EXTRA_REST +from flwr.common.constant import ( + MISSING_EXTRA_REST, + PING_BASE_MULTIPLIER, + PING_CALL_TIMEOUT, + PING_DEFAULT_INTERVAL, + PING_RANDOM_RANGE, +) from flwr.common.logger import log from flwr.common.message import Message, Metadata from flwr.common.retry_invoker import RetryInvoker @@ -33,6 +42,8 @@ CreateNodeRequest, CreateNodeResponse, DeleteNodeRequest, + PingRequest, + PingResponse, PullTaskInsRequest, PullTaskInsResponse, PushTaskResRequest, @@ -47,19 +58,15 @@ sys.exit(MISSING_EXTRA_REST) -KEY_NODE = "node" -KEY_METADATA = "in_message_metadata" - - PATH_CREATE_NODE: str = "api/v0/fleet/create-node" PATH_DELETE_NODE: str = "api/v0/fleet/delete-node" PATH_PULL_TASK_INS: str = "api/v0/fleet/pull-task-ins" PATH_PUSH_TASK_RES: str = "api/v0/fleet/push-task-res" +PATH_PING: str = "api/v0/fleet/ping" @contextmanager -# pylint: disable-next=too-many-statements -def http_request_response( +def http_request_response( # pylint: disable=R0914, R0915 server_address: str, insecure: bool, # pylint: disable=unused-argument retry_invoker: RetryInvoker, @@ -127,16 +134,71 @@ def http_request_response( "must be provided as a string path to the client.", ) - # Necessary state to validate messages to be sent - state: Dict[str, Optional[Metadata]] = {KEY_METADATA: None} - - # Enable create_node and delete_node to store node - node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} + # Shared variables for inner functions + metadata: Optional[Metadata] = None + node: Optional[Node] = None + ping_thread: Optional[threading.Thread] = None + ping_stop_event = threading.Event() ########################################################################### - # receive/send functions + # ping/create_node/delete_node/receive/send functions ########################################################################### + def ping() -> None: + # Get Node + if node is None: + log(ERROR, "Node instance missing") + return + + # Construct the ping request + req = PingRequest(node=node, ping_interval=PING_DEFAULT_INTERVAL) + req_bytes: bytes = req.SerializeToString() + + # Send the request + res = requests.post( + url=f"{base_url}/{PATH_PING}", + headers={ + "Accept": "application/protobuf", + "Content-Type": "application/protobuf", + }, + data=req_bytes, + verify=verify, + timeout=PING_CALL_TIMEOUT, + ) + + # Check status code and headers + if res.status_code != 200: + return + if "content-type" not in res.headers: + log( + WARN, + "[Node] POST /%s: missing header `Content-Type`", + PATH_PULL_TASK_INS, + ) + return + if res.headers["content-type"] != "application/protobuf": + log( + WARN, + "[Node] POST /%s: header `Content-Type` has wrong value", + PATH_PULL_TASK_INS, + ) + return + + # Deserialize ProtoBuf from bytes + ping_res = PingResponse() + ping_res.ParseFromString(res.content) + + # Check if success + if not ping_res.success: + raise RuntimeError("Ping failed unexpectedly.") + + # Wait + rd = random.uniform(*PING_RANDOM_RANGE) + next_interval: float = PING_DEFAULT_INTERVAL - PING_CALL_TIMEOUT + next_interval *= PING_BASE_MULTIPLIER + rd + if not ping_stop_event.is_set(): + ping_stop_event.wait(next_interval) + def create_node() -> None: """Set create_node.""" create_node_req_proto = CreateNodeRequest() @@ -175,15 +237,25 @@ def create_node() -> None: # Deserialize ProtoBuf from bytes create_node_response_proto = CreateNodeResponse() create_node_response_proto.ParseFromString(res.content) - # pylint: disable-next=no-member - node_store[KEY_NODE] = create_node_response_proto.node + + # Remember the node and the ping-loop thread + nonlocal node, ping_thread + node = create_node_response_proto.node + ping_thread = start_ping_loop(ping, ping_stop_event) def delete_node() -> None: """Set delete_node.""" - if node_store[KEY_NODE] is None: + nonlocal node + if node is None: log(ERROR, "Node instance missing") return - node: Node = cast(Node, node_store[KEY_NODE]) + + # Stop the ping-loop thread + ping_stop_event.set() + if ping_thread is not None: + ping_thread.join() + + # Send DeleteNode request delete_node_req_proto = DeleteNodeRequest(node=node) delete_node_req_req_bytes: bytes = delete_node_req_proto.SerializeToString() res = retry_invoker.invoke( @@ -215,13 +287,15 @@ def delete_node() -> None: PATH_PULL_TASK_INS, ) + # Cleanup + node = None + def receive() -> Optional[Message]: """Receive next task from server.""" # Get Node - if node_store[KEY_NODE] is None: + if node is None: log(ERROR, "Node instance missing") return None - node: Node = cast(Node, node_store[KEY_NODE]) # Request instructions (task) from server pull_task_ins_req_proto = PullTaskInsRequest(node=node) @@ -273,29 +347,29 @@ def receive() -> Optional[Message]: task_ins = None # Return the Message if available + nonlocal metadata message = None - state[KEY_METADATA] = None if task_ins is not None: message = message_from_taskins(task_ins) - state[KEY_METADATA] = copy(message.metadata) + metadata = copy(message.metadata) log(INFO, "[Node] POST /%s: success", PATH_PULL_TASK_INS) return message def send(message: Message) -> None: """Send task result back to server.""" # Get Node - if node_store[KEY_NODE] is None: + if node is None: log(ERROR, "Node instance missing") return # Get incoming message - in_metadata = state[KEY_METADATA] - if in_metadata is None: + nonlocal metadata + if metadata is None: log(ERROR, "No current message") return # Validate out message - if not validate_out_message(message, in_metadata): + if not validate_out_message(message, metadata): log(ERROR, "Invalid out message") return @@ -321,7 +395,7 @@ def send(message: Message) -> None: timeout=None, ) - state[KEY_METADATA] = None + metadata = None # Check status code and headers if res.status_code != 200: diff --git a/src/py/flwr/common/constant.py b/src/py/flwr/common/constant.py index 7d30a10f5881..99ba2d1d1c63 100644 --- a/src/py/flwr/common/constant.py +++ b/src/py/flwr/common/constant.py @@ -36,6 +36,12 @@ TRANSPORT_TYPE_VCE, ] +# Constants for ping +PING_DEFAULT_INTERVAL = 30 +PING_CALL_TIMEOUT = 5 +PING_BASE_MULTIPLIER = 0.8 +PING_RANDOM_RANGE = (-0.1, 0.1) + class MessageType: """Message type.""" diff --git a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py index d4e63a8f2d46..9fa7656198e5 100644 --- a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py +++ b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py @@ -63,7 +63,8 @@ def ping( state: State, # pylint: disable=unused-argument ) -> PingResponse: """.""" - return PingResponse(success=True) + res = state.acknowledge_ping(request.node.node_id, request.ping_interval) + return PingResponse(success=res) def pull_task_ins(request: PullTaskInsRequest, state: State) -> PullTaskInsResponse: diff --git a/src/py/flwr/server/superlink/fleet/rest_rere/rest_api.py b/src/py/flwr/server/superlink/fleet/rest_rere/rest_api.py index b022b34c68c8..33d17ef1d579 100644 --- a/src/py/flwr/server/superlink/fleet/rest_rere/rest_api.py +++ b/src/py/flwr/server/superlink/fleet/rest_rere/rest_api.py @@ -21,6 +21,7 @@ from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, DeleteNodeRequest, + PingRequest, PullTaskInsRequest, PushTaskResRequest, ) @@ -152,11 +153,38 @@ async def push_task_res(request: Request) -> Response: # Check if token is need ) +async def ping(request: Request) -> Response: + """Ping.""" + _check_headers(request.headers) + + # Get the request body as raw bytes + ping_request_bytes: bytes = await request.body() + + # Deserialize ProtoBuf + ping_request_proto = PingRequest() + ping_request_proto.ParseFromString(ping_request_bytes) + + # Get state from app + state: State = app.state.STATE_FACTORY.state() + + # Handle message + ping_response_proto = message_handler.ping(request=ping_request_proto, state=state) + + # Return serialized ProtoBuf + ping_response_bytes = ping_response_proto.SerializeToString() + return Response( + status_code=200, + content=ping_response_bytes, + headers={"Content-Type": "application/protobuf"}, + ) + + routes = [ Route("/api/v0/fleet/create-node", create_node, methods=["POST"]), Route("/api/v0/fleet/delete-node", delete_node, methods=["POST"]), Route("/api/v0/fleet/pull-task-ins", pull_task_ins, methods=["POST"]), Route("/api/v0/fleet/push-task-res", push_task_res, methods=["POST"]), + Route("/api/v0/fleet/ping", ping, methods=["POST"]), ] app: Starlette = Starlette( From 94204242a737368926e0e48342ffe025dc7b3409 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 1 Apr 2024 19:42:54 +0200 Subject: [PATCH 5/7] Remove experimental / add preview feature warnings (#3187) --- src/py/flwr/client/app.py | 4 +--- src/py/flwr/client/client_app.py | 7 +++++++ src/py/flwr/client/grpc_rere_client/connection.py | 4 +--- src/py/flwr/common/logger.py | 8 ++++---- src/py/flwr/server/server_app.py | 3 +++ 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 644d37060d53..7104ba267f57 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -36,7 +36,7 @@ TRANSPORT_TYPES, ) from flwr.common.exit_handlers import register_exit_handlers -from flwr.common.logger import log, warn_deprecated_feature, warn_experimental_feature +from flwr.common.logger import log, warn_deprecated_feature from flwr.common.message import Error from flwr.common.object_ref import load_app, validate from flwr.common.retry_invoker import RetryInvoker, exponential @@ -385,8 +385,6 @@ def _load_client_app() -> ClientApp: return ClientApp(client_fn=client_fn) load_client_app_fn = _load_client_app - else: - warn_experimental_feature("`load_client_app_fn`") # At this point, only `load_client_app_fn` should be used # Both `client` and `client_fn` must not be used directly diff --git a/src/py/flwr/client/client_app.py b/src/py/flwr/client/client_app.py index 0b56219807c6..79e7720cbb8e 100644 --- a/src/py/flwr/client/client_app.py +++ b/src/py/flwr/client/client_app.py @@ -23,6 +23,7 @@ from flwr.client.mod.utils import make_ffn from flwr.client.typing import ClientFn, Mod from flwr.common import Context, Message, MessageType +from flwr.common.logger import warn_preview_feature from .typing import ClientAppCallable @@ -123,6 +124,8 @@ def train_decorator(train_fn: ClientAppCallable) -> ClientAppCallable: if self._call: raise _registration_error(MessageType.TRAIN) + warn_preview_feature("ClientApp-register-train-function") + # Register provided function with the ClientApp object # Wrap mods around the wrapped step function self._train = make_ffn(train_fn, self._mods) @@ -151,6 +154,8 @@ def evaluate_decorator(evaluate_fn: ClientAppCallable) -> ClientAppCallable: if self._call: raise _registration_error(MessageType.EVALUATE) + warn_preview_feature("ClientApp-register-evaluate-function") + # Register provided function with the ClientApp object # Wrap mods around the wrapped step function self._evaluate = make_ffn(evaluate_fn, self._mods) @@ -179,6 +184,8 @@ def query_decorator(query_fn: ClientAppCallable) -> ClientAppCallable: if self._call: raise _registration_error(MessageType.QUERY) + warn_preview_feature("ClientApp-register-query-function") + # Register provided function with the ClientApp object # Wrap mods around the wrapped step function self._query = make_ffn(query_fn, self._mods) diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 06573ffaafb7..ba8b0d022685 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -34,7 +34,7 @@ PING_RANDOM_RANGE, ) from flwr.common.grpc import create_channel -from flwr.common.logger import log, warn_experimental_feature +from flwr.common.logger import log from flwr.common.message import Message, Metadata from flwr.common.retry_invoker import RetryInvoker from flwr.common.serde import message_from_taskins, message_to_taskres @@ -103,8 +103,6 @@ def grpc_request_response( # pylint: disable=R0914, R0915 create_node : Optional[Callable] delete_node : Optional[Callable] """ - warn_experimental_feature("`grpc-rere`") - if isinstance(root_certificates, str): root_certificates = Path(root_certificates).read_bytes() diff --git a/src/py/flwr/common/logger.py b/src/py/flwr/common/logger.py index 2bc41773ed61..258809ce062f 100644 --- a/src/py/flwr/common/logger.py +++ b/src/py/flwr/common/logger.py @@ -164,13 +164,13 @@ def configure( log = logger.log # pylint: disable=invalid-name -def warn_experimental_feature(name: str) -> None: - """Warn the user when they use an experimental feature.""" +def warn_preview_feature(name: str) -> None: + """Warn the user when they use a preview feature.""" log( WARN, - """EXPERIMENTAL FEATURE: %s + """PREVIEW FEATURE: %s - This is an experimental feature. It could change significantly or be removed + This is a preview feature. It could change significantly or be removed entirely in future versions of Flower. """, name, diff --git a/src/py/flwr/server/server_app.py b/src/py/flwr/server/server_app.py index 1b2eab87fdaa..ea2eb3fd1a69 100644 --- a/src/py/flwr/server/server_app.py +++ b/src/py/flwr/server/server_app.py @@ -18,6 +18,7 @@ from typing import Callable, Optional from flwr.common import Context, RecordSet +from flwr.common.logger import warn_preview_feature from flwr.server.strategy import Strategy from .client_manager import ClientManager @@ -120,6 +121,8 @@ def main_decorator(main_fn: ServerAppCallable) -> ServerAppCallable: """, ) + warn_preview_feature("ServerApp-register-main-function") + # Register provided function with the ServerApp object self._main = main_fn From 29ac32f7029bad2427f3527180b57110c4f5b6d4 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Tue, 2 Apr 2024 09:49:59 +0100 Subject: [PATCH 6/7] Add ping interval to create node request (#3189) --- src/proto/flwr/proto/fleet.proto | 2 +- .../client/grpc_rere_client/connection.py | 2 +- src/py/flwr/client/rest_client/connection.py | 2 +- src/py/flwr/proto/fleet_pb2.py | 52 +++++++++---------- src/py/flwr/proto/fleet_pb2.pyi | 5 ++ 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/proto/flwr/proto/fleet.proto b/src/proto/flwr/proto/fleet.proto index fa65f3ee9fed..0ef0fea5c6d6 100644 --- a/src/proto/flwr/proto/fleet.proto +++ b/src/proto/flwr/proto/fleet.proto @@ -37,7 +37,7 @@ service Fleet { } // CreateNode messages -message CreateNodeRequest {} +message CreateNodeRequest { double ping_interval = 1; } message CreateNodeResponse { Node node = 1; } // DeleteNode messages diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index ba8b0d022685..25e075f40af7 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -151,7 +151,7 @@ def ping() -> None: def create_node() -> None: """Set create_node.""" # Call FleetAPI - create_node_request = CreateNodeRequest() + create_node_request = CreateNodeRequest(ping_interval=PING_DEFAULT_INTERVAL) create_node_response = retry_invoker.invoke( stub.CreateNode, request=create_node_request, diff --git a/src/py/flwr/client/rest_client/connection.py b/src/py/flwr/client/rest_client/connection.py index 514635103f01..0e6a7ef554e6 100644 --- a/src/py/flwr/client/rest_client/connection.py +++ b/src/py/flwr/client/rest_client/connection.py @@ -201,7 +201,7 @@ def ping() -> None: def create_node() -> None: """Set create_node.""" - create_node_req_proto = CreateNodeRequest() + create_node_req_proto = CreateNodeRequest(ping_interval=PING_DEFAULT_INTERVAL) create_node_req_bytes: bytes = create_node_req_proto.SerializeToString() res = retry_invoker.invoke( diff --git a/src/py/flwr/proto/fleet_pb2.py b/src/py/flwr/proto/fleet_pb2.py index 546987f1c807..06d90c5d1a44 100644 --- a/src/py/flwr/proto/fleet_pb2.py +++ b/src/py/flwr/proto/fleet_pb2.py @@ -16,7 +16,7 @@ from flwr.proto import task_pb2 as flwr_dot_proto_dot_task__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x66lwr/proto/fleet.proto\x12\nflwr.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x15\x66lwr/proto/task.proto\"\x13\n\x11\x43reateNodeRequest\"4\n\x12\x43reateNodeResponse\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"3\n\x11\x44\x65leteNodeRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"\x14\n\x12\x44\x65leteNodeResponse\"D\n\x0bPingRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x15\n\rping_interval\x18\x02 \x01(\x01\"\x1f\n\x0cPingResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\"F\n\x12PullTaskInsRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x10\n\x08task_ids\x18\x02 \x03(\t\"k\n\x13PullTaskInsResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12*\n\rtask_ins_list\x18\x02 \x03(\x0b\x32\x13.flwr.proto.TaskIns\"@\n\x12PushTaskResRequest\x12*\n\rtask_res_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskRes\"\xae\x01\n\x13PushTaskResResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12=\n\x07results\x18\x02 \x03(\x0b\x32,.flwr.proto.PushTaskResResponse.ResultsEntry\x1a.\n\x0cResultsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\r:\x02\x38\x01\"\x1e\n\tReconnect\x12\x11\n\treconnect\x18\x01 \x01(\x04\x32\x86\x03\n\x05\x46leet\x12M\n\nCreateNode\x12\x1d.flwr.proto.CreateNodeRequest\x1a\x1e.flwr.proto.CreateNodeResponse\"\x00\x12M\n\nDeleteNode\x12\x1d.flwr.proto.DeleteNodeRequest\x1a\x1e.flwr.proto.DeleteNodeResponse\"\x00\x12;\n\x04Ping\x12\x17.flwr.proto.PingRequest\x1a\x18.flwr.proto.PingResponse\"\x00\x12P\n\x0bPullTaskIns\x12\x1e.flwr.proto.PullTaskInsRequest\x1a\x1f.flwr.proto.PullTaskInsResponse\"\x00\x12P\n\x0bPushTaskRes\x12\x1e.flwr.proto.PushTaskResRequest\x1a\x1f.flwr.proto.PushTaskResResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x66lwr/proto/fleet.proto\x12\nflwr.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x15\x66lwr/proto/task.proto\"*\n\x11\x43reateNodeRequest\x12\x15\n\rping_interval\x18\x01 \x01(\x01\"4\n\x12\x43reateNodeResponse\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"3\n\x11\x44\x65leteNodeRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"\x14\n\x12\x44\x65leteNodeResponse\"D\n\x0bPingRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x15\n\rping_interval\x18\x02 \x01(\x01\"\x1f\n\x0cPingResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\"F\n\x12PullTaskInsRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x10\n\x08task_ids\x18\x02 \x03(\t\"k\n\x13PullTaskInsResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12*\n\rtask_ins_list\x18\x02 \x03(\x0b\x32\x13.flwr.proto.TaskIns\"@\n\x12PushTaskResRequest\x12*\n\rtask_res_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskRes\"\xae\x01\n\x13PushTaskResResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12=\n\x07results\x18\x02 \x03(\x0b\x32,.flwr.proto.PushTaskResResponse.ResultsEntry\x1a.\n\x0cResultsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\r:\x02\x38\x01\"\x1e\n\tReconnect\x12\x11\n\treconnect\x18\x01 \x01(\x04\x32\x86\x03\n\x05\x46leet\x12M\n\nCreateNode\x12\x1d.flwr.proto.CreateNodeRequest\x1a\x1e.flwr.proto.CreateNodeResponse\"\x00\x12M\n\nDeleteNode\x12\x1d.flwr.proto.DeleteNodeRequest\x1a\x1e.flwr.proto.DeleteNodeResponse\"\x00\x12;\n\x04Ping\x12\x17.flwr.proto.PingRequest\x1a\x18.flwr.proto.PingResponse\"\x00\x12P\n\x0bPullTaskIns\x12\x1e.flwr.proto.PullTaskInsRequest\x1a\x1f.flwr.proto.PullTaskInsResponse\"\x00\x12P\n\x0bPushTaskRes\x12\x1e.flwr.proto.PushTaskResRequest\x1a\x1f.flwr.proto.PushTaskResResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -26,29 +26,29 @@ _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._options = None _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_options = b'8\001' _globals['_CREATENODEREQUEST']._serialized_start=84 - _globals['_CREATENODEREQUEST']._serialized_end=103 - _globals['_CREATENODERESPONSE']._serialized_start=105 - _globals['_CREATENODERESPONSE']._serialized_end=157 - _globals['_DELETENODEREQUEST']._serialized_start=159 - _globals['_DELETENODEREQUEST']._serialized_end=210 - _globals['_DELETENODERESPONSE']._serialized_start=212 - _globals['_DELETENODERESPONSE']._serialized_end=232 - _globals['_PINGREQUEST']._serialized_start=234 - _globals['_PINGREQUEST']._serialized_end=302 - _globals['_PINGRESPONSE']._serialized_start=304 - _globals['_PINGRESPONSE']._serialized_end=335 - _globals['_PULLTASKINSREQUEST']._serialized_start=337 - _globals['_PULLTASKINSREQUEST']._serialized_end=407 - _globals['_PULLTASKINSRESPONSE']._serialized_start=409 - _globals['_PULLTASKINSRESPONSE']._serialized_end=516 - _globals['_PUSHTASKRESREQUEST']._serialized_start=518 - _globals['_PUSHTASKRESREQUEST']._serialized_end=582 - _globals['_PUSHTASKRESRESPONSE']._serialized_start=585 - _globals['_PUSHTASKRESRESPONSE']._serialized_end=759 - _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_start=713 - _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_end=759 - _globals['_RECONNECT']._serialized_start=761 - _globals['_RECONNECT']._serialized_end=791 - _globals['_FLEET']._serialized_start=794 - _globals['_FLEET']._serialized_end=1184 + _globals['_CREATENODEREQUEST']._serialized_end=126 + _globals['_CREATENODERESPONSE']._serialized_start=128 + _globals['_CREATENODERESPONSE']._serialized_end=180 + _globals['_DELETENODEREQUEST']._serialized_start=182 + _globals['_DELETENODEREQUEST']._serialized_end=233 + _globals['_DELETENODERESPONSE']._serialized_start=235 + _globals['_DELETENODERESPONSE']._serialized_end=255 + _globals['_PINGREQUEST']._serialized_start=257 + _globals['_PINGREQUEST']._serialized_end=325 + _globals['_PINGRESPONSE']._serialized_start=327 + _globals['_PINGRESPONSE']._serialized_end=358 + _globals['_PULLTASKINSREQUEST']._serialized_start=360 + _globals['_PULLTASKINSREQUEST']._serialized_end=430 + _globals['_PULLTASKINSRESPONSE']._serialized_start=432 + _globals['_PULLTASKINSRESPONSE']._serialized_end=539 + _globals['_PUSHTASKRESREQUEST']._serialized_start=541 + _globals['_PUSHTASKRESREQUEST']._serialized_end=605 + _globals['_PUSHTASKRESRESPONSE']._serialized_start=608 + _globals['_PUSHTASKRESRESPONSE']._serialized_end=782 + _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_start=736 + _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_end=782 + _globals['_RECONNECT']._serialized_start=784 + _globals['_RECONNECT']._serialized_end=814 + _globals['_FLEET']._serialized_start=817 + _globals['_FLEET']._serialized_end=1207 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/fleet_pb2.pyi b/src/py/flwr/proto/fleet_pb2.pyi index e5c5b7366464..5989f45c5c60 100644 --- a/src/py/flwr/proto/fleet_pb2.pyi +++ b/src/py/flwr/proto/fleet_pb2.pyi @@ -16,8 +16,13 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor class CreateNodeRequest(google.protobuf.message.Message): """CreateNode messages""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + PING_INTERVAL_FIELD_NUMBER: builtins.int + ping_interval: builtins.float def __init__(self, + *, + ping_interval: builtins.float = ..., ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["ping_interval",b"ping_interval"]) -> None: ... global___CreateNodeRequest = CreateNodeRequest class CreateNodeResponse(google.protobuf.message.Message): From 01735671818dd31ba1d0318db7b366f0e84f1a01 Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 2 Apr 2024 10:04:13 +0100 Subject: [PATCH 7/7] Add minimal `ErrorCodes` definition (#3185) Co-authored-by: Daniel J. Beutel --- src/py/flwr/common/constant.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/py/flwr/common/constant.py b/src/py/flwr/common/constant.py index 99ba2d1d1c63..3ee60f6222f9 100644 --- a/src/py/flwr/common/constant.py +++ b/src/py/flwr/common/constant.py @@ -74,3 +74,14 @@ class SType: def __new__(cls) -> SType: """Prevent instantiation.""" raise TypeError(f"{cls.__name__} cannot be instantiated.") + + +class ErrorCode: + """Error codes for Message's Error.""" + + UNKNOWN = 0 + CLIENT_APP_RAISED_EXCEPTION = 1 + + def __new__(cls) -> ErrorCode: + """Prevent instantiation.""" + raise TypeError(f"{cls.__name__} cannot be instantiated.")