From 4c83a5e9def9f948b8d3645e3775450d771729ef Mon Sep 17 00:00:00 2001 From: jafermarq Date: Wed, 21 Feb 2024 17:57:24 +0000 Subject: [PATCH 01/20] init --- src/py/flwr/common/constant.py | 2 + src/py/flwr/server/app.py | 77 ++++++++++++++++++- .../server/superlink/fleet/vce/__init__.py | 15 ++++ .../server/superlink/fleet/vce/vce_api.py | 68 ++++++++++++++++ 4 files changed, 160 insertions(+), 2 deletions(-) create mode 100644 src/py/flwr/server/superlink/fleet/vce/__init__.py create mode 100644 src/py/flwr/server/superlink/fleet/vce/vce_api.py diff --git a/src/py/flwr/common/constant.py b/src/py/flwr/common/constant.py index 811fff73f06d..2946a594e68c 100644 --- a/src/py/flwr/common/constant.py +++ b/src/py/flwr/common/constant.py @@ -28,10 +28,12 @@ TRANSPORT_TYPE_GRPC_BIDI = "grpc-bidi" TRANSPORT_TYPE_GRPC_RERE = "grpc-rere" TRANSPORT_TYPE_REST = "rest" +TRANSPORT_TYPE_VCE = "vce" TRANSPORT_TYPES = [ TRANSPORT_TYPE_GRPC_BIDI, TRANSPORT_TYPE_GRPC_RERE, TRANSPORT_TYPE_REST, + TRANSPORT_TYPE_VCE, ] MESSAGE_TYPE_GET_PROPERTIES = "get_properties" diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index dbbf63b0fe5e..75fa372d084b 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -14,9 +14,9 @@ # ============================================================================== """Flower server app.""" - import argparse import importlib.util +import json import sys import threading from logging import ERROR, INFO, WARN @@ -24,7 +24,7 @@ from pathlib import Path from signal import SIGINT, SIGTERM, signal from types import FrameType -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union import grpc @@ -34,6 +34,7 @@ MISSING_EXTRA_REST, TRANSPORT_TYPE_GRPC_RERE, TRANSPORT_TYPE_REST, + TRANSPORT_TYPE_VCE, ) from flwr.common.logger import log from flwr.proto.driver_pb2_grpc import ( # pylint: disable=E0611 @@ -315,6 +316,15 @@ def run_fleet_api() -> None: certificates=certificates, ) grpc_servers.append(fleet_server) + elif args.fleet_api_type == TRANSPORT_TYPE_VCE: + _run_fleet_api_vce( + num_supernodes=args.num_supernodes, + client_app_str=args.client_app, + backend=args.backend, + backend_config=args.backend_config, + working_dir=args.dir, + state_factory=state_factory, + ) else: raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}") @@ -537,6 +547,29 @@ def _run_fleet_api_grpc_rere( return fleet_grpc_server +# pylint: disable=import-outside-toplevel,too-many-arguments +def _run_fleet_api_vce( + num_supernodes: int, + client_app_str: str, + backend: str, + backend_config: Dict[str, Union[str, int, float]], + working_dir: str, + state_factory: StateFactory, +) -> None: + from flwr.server.superlink.fleet.vce.vce_api import start_vce + + log(INFO, "Flower VCE: Starting Fleet API (VirtualClientEngine)") + + start_vce( + num_supernodes=num_supernodes, + client_app_str=client_app_str, + backend_str=backend, + backend_config=backend_config, + state_factory=state_factory, + working_dir=working_dir, + ) + + # pylint: disable=import-outside-toplevel,too-many-arguments def _run_fleet_api_rest( host: str, @@ -714,6 +747,14 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: help="Start a Fleet API server (REST, experimental)", ) + ex_group.add_argument( + "--vce", + action="store_const", + dest="fleet_api_type", + const=TRANSPORT_TYPE_VCE, + help="Start a Fleet API server (VirtualClientEngine)", + ) + # Fleet API gRPC-rere options grpc_rere_group = parser.add_argument_group( "Fleet API (gRPC-rere) server options", "" @@ -749,3 +790,35 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: type=int, default=1, ) + + # Fleet API VCE options + vce_group = parser.add_argument_group("Fleet API (VCE) server options", "") + vce_group.add_argument( + "--client-app", + help="For example: `client:app` or `project.package.module:wrapper.app`.", + ) + vce_group.add_argument( + "--num-supernodes", + type=int, + help="Number of SuperNodes connected to the SuperLink.", + ) + vce_group.add_argument( + "--backend", + default="ray", + type=str, + help="Simulation Backend that process a ClientApp.", + ) + vce_group.add_argument( + "--backend-config", + type=json.loads, + default='{"num_cpus":2, "num_gpus":0.0}', + help='A dict in the form \'{"":, "":}\' to ' + "configure a backend. Pay close attention to how the quotes and double quotes " + "are set.", + ) + parser.add_argument( + "--dir", + default="", + help="Add specified directory to the PYTHONPATH." + " Default: current working directory.", + ) diff --git a/src/py/flwr/server/superlink/fleet/vce/__init__.py b/src/py/flwr/server/superlink/fleet/vce/__init__.py new file mode 100644 index 000000000000..563f77595e1c --- /dev/null +++ b/src/py/flwr/server/superlink/fleet/vce/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Fleet VirtualClientEngine side.""" diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py new file mode 100644 index 000000000000..a6160125f1ea --- /dev/null +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -0,0 +1,68 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Fleet VirtualClientEngine API.""" + + +from logging import INFO +from typing import Dict, Union + +from flwr.client.clientapp import ClientApp, load_client_app +from flwr.client.node_state import NodeState +from flwr.common.logger import log +from flwr.server.superlink.state import StateFactory + +NodeToPartitionMapping = Dict[int, int] + + +def _register_nodes( + num_nodes: int, state_factory: StateFactory +) -> NodeToPartitionMapping: + """Registre nodes with the StateFactory and create node-id:partition-id mapping.""" + nodes_mapping: NodeToPartitionMapping = {} + state = state_factory.state() + for i in range(num_nodes): + node_id = state.create_node() + nodes_mapping[node_id] = i + log(INFO, "Registered %i nodes", len(nodes_mapping)) + return nodes_mapping + + +# pylint: disable=too-many-arguments,unused-argument +def start_vce( + num_supernodes: int, + client_app_str: str, + backend_str: str, + backend_config: Dict[str, Union[str, int, float]], + state_factory: StateFactory, + working_dir: str, +) -> None: + """Start Fleet API with the VirtualClientEngine (VCE).""" + # Register SuperNodes + nodes_mapping = _register_nodes( + num_nodes=num_supernodes, state_factory=state_factory + ) + + # Construct mapping of NodeStates + node_states: Dict[int, NodeState] = {} + for node_id in nodes_mapping: + node_states[node_id] = NodeState() + + log(INFO, "client_app_str = %s", client_app_str) + + def _load() -> ClientApp: + app: ClientApp = load_client_app(client_app_str) + return app + + # start backend From a85db409e037e7bdd243394fa108d315e55c0b22 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Wed, 21 Feb 2024 18:16:47 +0000 Subject: [PATCH 02/20] base backend --- .../superlink/fleet/vce/backend/__init__.py | 21 ++++++++ .../superlink/fleet/vce/backend/backend.py | 53 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/py/flwr/server/superlink/fleet/vce/backend/__init__.py create mode 100644 src/py/flwr/server/superlink/fleet/vce/backend/backend.py diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py new file mode 100644 index 000000000000..3ff90c288a57 --- /dev/null +++ b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""VirtualClientEngine Backends.""" + +from .backend import Backend + +__all__ = [ + "Backend", +] diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py new file mode 100644 index 000000000000..ed6f7857d936 --- /dev/null +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -0,0 +1,53 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Generic Backend class for Fleet API using the VCE.""" + + +from abc import ABC, abstractmethod +from typing import Callable, Tuple + +from flwr.client.clientapp import ClientApp +from flwr.common.context import Context +from flwr.common.message import Message + + +class Backend(ABC): + """Abstract base class for a Backend.""" + + async def build(self) -> None: + """Build backend asynchronously. + + Different components need to be inplace before workers in a backend are ready to + accept jobs. When this method finish executed, the backend should be fully ready + to run jobs. + """ + + @property + def num_workers(self) -> int: + """Return number of workers in the backend. + + This is the number of TaskIns that can be run concurrently. + """ + return 0 + + @abstractmethod + async def process_message( + self, + app: Callable[[], ClientApp], + message: Message, + context: Context, + node_id: int, + ) -> Tuple[Message, Context]: + """Submit a job to the backend.""" From b77031219b3b5a74b9fcbace60ee9b67950f6971 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Wed, 21 Feb 2024 18:17:10 +0000 Subject: [PATCH 03/20] update --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index ed6f7857d936..1c83e604c650 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -48,6 +48,5 @@ async def process_message( app: Callable[[], ClientApp], message: Message, context: Context, - node_id: int, ) -> Tuple[Message, Context]: """Submit a job to the backend.""" From b9c64554ce24eb5114d8949191f9e7eda54eac88 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Wed, 21 Feb 2024 18:25:02 +0000 Subject: [PATCH 04/20] update docstrings --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 1c83e604c650..ff28724b14fd 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -30,7 +30,7 @@ async def build(self) -> None: """Build backend asynchronously. Different components need to be inplace before workers in a backend are ready to - accept jobs. When this method finish executed, the backend should be fully ready + accept jobs. When this method finish executing, the backend should be fully ready to run jobs. """ @@ -38,7 +38,7 @@ async def build(self) -> None: def num_workers(self) -> int: """Return number of workers in the backend. - This is the number of TaskIns that can be run concurrently. + This is the number of TaskIns that can be processed concurrently. """ return 0 From cd48539ee8185333d9edaa16d0de7147a38601ec Mon Sep 17 00:00:00 2001 From: jafermarq Date: Wed, 21 Feb 2024 18:26:40 +0000 Subject: [PATCH 05/20] minor fixes --- src/py/flwr/server/app.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index 75fa372d084b..e16ab4dc4b88 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -14,6 +14,7 @@ # ============================================================================== """Flower server app.""" + import argparse import importlib.util import json @@ -800,13 +801,13 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: vce_group.add_argument( "--num-supernodes", type=int, - help="Number of SuperNodes connected to the SuperLink.", + help="Number of SuperNodes to register with the SuperLink.", ) vce_group.add_argument( "--backend", default="ray", type=str, - help="Simulation Backend that process a ClientApp.", + help="Simulation Backend that processes a ClientApp.", ) vce_group.add_argument( "--backend-config", @@ -819,6 +820,6 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--dir", default="", - help="Add specified directory to the PYTHONPATH." + help="Add a specified directory to the PYTHONPATH." " Default: current working directory.", ) From 2791163b68004b14ad11945111ba36ebde38fef1 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 09:43:14 +0000 Subject: [PATCH 06/20] updates --- src/py/flwr/server/app.py | 7 ++++--- .../superlink/fleet/vce/backend/__init__.py | 3 ++- .../server/superlink/fleet/vce/backend/backend.py | 15 +++++++++++---- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 6 ++++-- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index 75fa372d084b..b2e5cefe45d6 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -24,7 +24,7 @@ from pathlib import Path from signal import SIGINT, SIGTERM, signal from types import FrameType -from typing import Dict, List, Optional, Tuple, Union +from typing import List, Optional, Tuple import grpc @@ -55,6 +55,7 @@ start_grpc_server, ) from .superlink.fleet.grpc_rere.fleet_servicer import FleetServicer +from .superlink.fleet.vce.backend import BackendConfig from .superlink.state import StateFactory ADDRESS_DRIVER_API = "0.0.0.0:9091" @@ -552,11 +553,11 @@ def _run_fleet_api_vce( num_supernodes: int, client_app_str: str, backend: str, - backend_config: Dict[str, Union[str, int, float]], + backend_config: BackendConfig, working_dir: str, state_factory: StateFactory, ) -> None: - from flwr.server.superlink.fleet.vce.vce_api import start_vce + from .superlink.fleet.vce.vce_api import start_vce log(INFO, "Flower VCE: Starting Fleet API (VirtualClientEngine)") diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py index 3ff90c288a57..305cb32c16e5 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py @@ -14,8 +14,9 @@ # ============================================================================== """VirtualClientEngine Backends.""" -from .backend import Backend +from .backend import Backend, BackendConfig __all__ = [ "Backend", + "BackendConfig", ] diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index ff28724b14fd..90745f12e71c 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -16,22 +16,25 @@ from abc import ABC, abstractmethod -from typing import Callable, Tuple +from typing import Callable, Dict, Tuple, Union from flwr.client.clientapp import ClientApp from flwr.common.context import Context from flwr.common.message import Message +BackendConfig = Dict[str, Union[str, int, float]] + class Backend(ABC): """Abstract base class for a Backend.""" - async def build(self) -> None: + @abstractmethod + async def build(self, backend_config: BackendConfig) -> None: """Build backend asynchronously. Different components need to be inplace before workers in a backend are ready to - accept jobs. When this method finish executing, the backend should be fully ready - to run jobs. + accept jobs. When this method finish executing, the backend should be fully + ready to run jobs. """ @property @@ -42,6 +45,10 @@ def num_workers(self) -> int: """ return 0 + @abstractmethod + def is_worker_idle(self) -> bool: + """Report whether a backend worker is idle and can therefore run a ClientApp.""" + @abstractmethod async def process_message( self, diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index a6160125f1ea..0c9b1589e89b 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -16,13 +16,15 @@ from logging import INFO -from typing import Dict, Union +from typing import Dict from flwr.client.clientapp import ClientApp, load_client_app from flwr.client.node_state import NodeState from flwr.common.logger import log from flwr.server.superlink.state import StateFactory +from .backend import BackendConfig + NodeToPartitionMapping = Dict[int, int] @@ -44,7 +46,7 @@ def start_vce( num_supernodes: int, client_app_str: str, backend_str: str, - backend_config: Dict[str, Union[str, int, float]], + backend_config: BackendConfig, state_factory: StateFactory, working_dir: str, ) -> None: From 4ca33ece05702923c3ddd65ae5bf5529e8387241 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 09:59:27 +0000 Subject: [PATCH 07/20] backend-config should contain value types --- src/py/flwr/server/app.py | 7 ++++--- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index b2e5cefe45d6..ec64a6e85186 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -812,10 +812,11 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: vce_group.add_argument( "--backend-config", type=json.loads, - default='{"num_cpus":2, "num_gpus":0.0}', + default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}', help='A dict in the form \'{"":, "":}\' to ' - "configure a backend. Pay close attention to how the quotes and double quotes " - "are set.", + "configure a backend. Values supported in are those included by " + "`flwr.common.typing.ConfigsRecordValues`. " + "Pay close attention to how the quotes and double quotes are set.", ) parser.add_argument( "--dir", diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 90745f12e71c..3f428061e9a8 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -16,13 +16,14 @@ from abc import ABC, abstractmethod -from typing import Callable, Dict, Tuple, Union +from typing import Callable, Dict, Tuple from flwr.client.clientapp import ClientApp from flwr.common.context import Context from flwr.common.message import Message +from flwr.common.typing import ConfigsRecordValues -BackendConfig = Dict[str, Union[str, int, float]] +BackendConfig = Dict[str, ConfigsRecordValues] class Backend(ABC): From 1b6564ac578174f82bee1604813a6467e7d50840 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 10:00:27 +0000 Subject: [PATCH 08/20] fix --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 3f428061e9a8..28a080b12521 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -23,7 +23,7 @@ from flwr.common.message import Message from flwr.common.typing import ConfigsRecordValues -BackendConfig = Dict[str, ConfigsRecordValues] +BackendConfig = Dict[str, Dict[str, ConfigsRecordValues]] class Backend(ABC): From bad872788a7814072ff97cc11a1b00ce7146c14d Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 10:12:12 +0000 Subject: [PATCH 09/20] w/ previous --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 28a080b12521..9b7cc18f3c08 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -30,7 +30,7 @@ class Backend(ABC): """Abstract base class for a Backend.""" @abstractmethod - async def build(self, backend_config: BackendConfig) -> None: + async def build(self) -> None: """Build backend asynchronously. Different components need to be inplace before workers in a backend are ready to From a68172fbef7d4bb4416e11cc856f5cd4170314c1 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 11:22:07 +0000 Subject: [PATCH 10/20] fix --- src/py/flwr/server/app.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index e16ab4dc4b88..f686cf1f9bb7 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -317,15 +317,6 @@ def run_fleet_api() -> None: certificates=certificates, ) grpc_servers.append(fleet_server) - elif args.fleet_api_type == TRANSPORT_TYPE_VCE: - _run_fleet_api_vce( - num_supernodes=args.num_supernodes, - client_app_str=args.client_app, - backend=args.backend, - backend_config=args.backend_config, - working_dir=args.dir, - state_factory=state_factory, - ) else: raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}") @@ -412,6 +403,15 @@ def run_superlink() -> None: certificates=certificates, ) grpc_servers.append(fleet_server) + elif args.fleet_api_type == TRANSPORT_TYPE_VCE: + _run_fleet_api_vce( + num_supernodes=args.num_supernodes, + client_app_str=args.client_app, + backend=args.backend, + backend_config=args.backend_config, + working_dir=args.dir, + state_factory=state_factory, + ) else: raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}") From 68818666aa41aa5f14d1c06aa1eb6de8480eb116 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 11:25:17 +0000 Subject: [PATCH 11/20] fix for json.loads --- src/py/flwr/server/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index d7f8471a0aaa..7acc69be5d50 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -813,7 +813,7 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: vce_group.add_argument( "--backend-config", type=json.loads, - default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}', + default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}}', help='A dict in the form \'{"":, "":}\' to ' "configure a backend. Values supported in are those included by " "`flwr.common.typing.ConfigsRecordValues`. " From 935e3337e774b6777ff3ebc9b3ccd2b99980dbfd Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 12:09:06 +0000 Subject: [PATCH 12/20] keep backend-config as json string --- src/py/flwr/server/app.py | 22 +++++++++---------- .../server/superlink/fleet/vce/vce_api.py | 9 +++++--- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index f686cf1f9bb7..e11a58a19d21 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -17,7 +17,6 @@ import argparse import importlib.util -import json import sys import threading from logging import ERROR, INFO, WARN @@ -25,7 +24,7 @@ from pathlib import Path from signal import SIGINT, SIGTERM, signal from types import FrameType -from typing import Dict, List, Optional, Tuple, Union +from typing import List, Optional, Tuple import grpc @@ -408,7 +407,7 @@ def run_superlink() -> None: num_supernodes=args.num_supernodes, client_app_str=args.client_app, backend=args.backend, - backend_config=args.backend_config, + backend_config_json_str=args.backend_config, working_dir=args.dir, state_factory=state_factory, ) @@ -553,11 +552,11 @@ def _run_fleet_api_vce( num_supernodes: int, client_app_str: str, backend: str, - backend_config: Dict[str, Union[str, int, float]], + backend_config_json_str: str, working_dir: str, state_factory: StateFactory, ) -> None: - from flwr.server.superlink.fleet.vce.vce_api import start_vce + from .superlink.fleet.vce.vce_api import start_vce log(INFO, "Flower VCE: Starting Fleet API (VirtualClientEngine)") @@ -565,7 +564,7 @@ def _run_fleet_api_vce( num_supernodes=num_supernodes, client_app_str=client_app_str, backend_str=backend, - backend_config=backend_config, + backend_config_json_str=backend_config_json_str, state_factory=state_factory, working_dir=working_dir, ) @@ -811,11 +810,12 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: ) vce_group.add_argument( "--backend-config", - type=json.loads, - default='{"num_cpus":2, "num_gpus":0.0}', - help='A dict in the form \'{"":, "":}\' to ' - "configure a backend. Pay close attention to how the quotes and double quotes " - "are set.", + type=str, + default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}}', + help='A JSON-like dict, e.g. \'{"":, "":}\' to ' + "configure a backend. Values supported in are those included by " + "`flwr.common.typing.ConfigsRecordValues`. " + "Pay close attention to how the quotes and double quotes are set.", ) parser.add_argument( "--dir", diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index a6160125f1ea..88144b1c3c09 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -14,9 +14,9 @@ # ============================================================================== """Fleet VirtualClientEngine API.""" - +import json from logging import INFO -from typing import Dict, Union +from typing import Dict from flwr.client.clientapp import ClientApp, load_client_app from flwr.client.node_state import NodeState @@ -44,7 +44,7 @@ def start_vce( num_supernodes: int, client_app_str: str, backend_str: str, - backend_config: Dict[str, Union[str, int, float]], + backend_config_json_str: str, state_factory: StateFactory, working_dir: str, ) -> None: @@ -59,6 +59,9 @@ def start_vce( for node_id in nodes_mapping: node_states[node_id] = NodeState() + # Load backend config + _ = json.loads(backend_config_json_str) + log(INFO, "client_app_str = %s", client_app_str) def _load() -> ClientApp: From 0e02b05cd60b6f364182321f9f0ef8734334ea4e Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 18:50:58 +0000 Subject: [PATCH 13/20] moved import --- src/py/flwr/server/app.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index e11a58a19d21..0a24c1e36e4f 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -55,6 +55,7 @@ start_grpc_server, ) from .superlink.fleet.grpc_rere.fleet_servicer import FleetServicer +from .superlink.fleet.vce.vce_api import start_vce from .superlink.state import StateFactory ADDRESS_DRIVER_API = "0.0.0.0:9091" @@ -547,7 +548,7 @@ def _run_fleet_api_grpc_rere( return fleet_grpc_server -# pylint: disable=import-outside-toplevel,too-many-arguments +# pylint: disable=too-many-arguments def _run_fleet_api_vce( num_supernodes: int, client_app_str: str, @@ -556,8 +557,6 @@ def _run_fleet_api_vce( working_dir: str, state_factory: StateFactory, ) -> None: - from .superlink.fleet.vce.vce_api import start_vce - log(INFO, "Flower VCE: Starting Fleet API (VirtualClientEngine)") start_vce( From fd67f22d09e6097667dbe56e22c53e2d7c96fc01 Mon Sep 17 00:00:00 2001 From: Javier Date: Thu, 22 Feb 2024 19:29:45 +0000 Subject: [PATCH 14/20] Apply suggestions from code review Co-authored-by: Daniel J. Beutel --- src/py/flwr/server/app.py | 7 ++++--- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index 0a24c1e36e4f..84eca40e995d 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -799,13 +799,13 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: vce_group.add_argument( "--num-supernodes", type=int, - help="Number of SuperNodes to register with the SuperLink.", + help="Number of simulated SuperNodes.", ) vce_group.add_argument( "--backend", default="ray", type=str, - help="Simulation Backend that processes a ClientApp.", + help="Simulation backend that executes the ClientApp.", ) vce_group.add_argument( "--backend-config", @@ -819,6 +819,7 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--dir", default="", - help="Add a specified directory to the PYTHONPATH." + help="Add specified directory to the PYTHONPATH and load" + "ClientApp from there." " Default: current working directory.", ) diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index 88144b1c3c09..9357693a0e87 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -29,7 +29,7 @@ def _register_nodes( num_nodes: int, state_factory: StateFactory ) -> NodeToPartitionMapping: - """Registre nodes with the StateFactory and create node-id:partition-id mapping.""" + """Register nodes with the StateFactory and create node-id:partition-id mapping.""" nodes_mapping: NodeToPartitionMapping = {} state = state_factory.state() for i in range(num_nodes): From cf004d8ed4bbb520a88b05bc0ce888809149dd17 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 22 Feb 2024 19:43:23 +0000 Subject: [PATCH 15/20] renamed vars; exporting --- src/py/flwr/server/app.py | 27 +++++++++---------- .../server/superlink/fleet/vce/__init__.py | 6 +++++ .../server/superlink/fleet/vce/vce_api.py | 12 ++++----- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index 84eca40e995d..c8cdef9ff323 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -55,7 +55,7 @@ start_grpc_server, ) from .superlink.fleet.grpc_rere.fleet_servicer import FleetServicer -from .superlink.fleet.vce.vce_api import start_vce +from .superlink.fleet.vce import start_vce from .superlink.state import StateFactory ADDRESS_DRIVER_API = "0.0.0.0:9091" @@ -406,9 +406,9 @@ def run_superlink() -> None: elif args.fleet_api_type == TRANSPORT_TYPE_VCE: _run_fleet_api_vce( num_supernodes=args.num_supernodes, - client_app_str=args.client_app, - backend=args.backend, - backend_config_json_str=args.backend_config, + client_app_module_name=args.client_app, + backend_name=args.backend, + backend_config_json_stream=args.backend_config, working_dir=args.dir, state_factory=state_factory, ) @@ -551,9 +551,9 @@ def _run_fleet_api_grpc_rere( # pylint: disable=too-many-arguments def _run_fleet_api_vce( num_supernodes: int, - client_app_str: str, - backend: str, - backend_config_json_str: str, + client_app_module_name: str, + backend_name: str, + backend_config_json_stream: str, working_dir: str, state_factory: StateFactory, ) -> None: @@ -561,9 +561,9 @@ def _run_fleet_api_vce( start_vce( num_supernodes=num_supernodes, - client_app_str=client_app_str, - backend_str=backend, - backend_config_json_str=backend_config_json_str, + client_app_module_name=client_app_module_name, + backend_name=backend_name, + backend_config_json_stream=backend_config_json_stream, state_factory=state_factory, working_dir=working_dir, ) @@ -810,11 +810,10 @@ def _add_args_fleet_api(parser: argparse.ArgumentParser) -> None: vce_group.add_argument( "--backend-config", type=str, - default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}}', - help='A JSON-like dict, e.g. \'{"":, "":}\' to ' + default='{"client_resources": {"num_cpus":1, "num_gpus":0.0}, "tensorflow": 0}', + help='A JSON formatted stream, e.g \'{"":, "":}\' to ' "configure a backend. Values supported in are those included by " - "`flwr.common.typing.ConfigsRecordValues`. " - "Pay close attention to how the quotes and double quotes are set.", + "`flwr.common.typing.ConfigsRecordValues`. ", ) parser.add_argument( "--dir", diff --git a/src/py/flwr/server/superlink/fleet/vce/__init__.py b/src/py/flwr/server/superlink/fleet/vce/__init__.py index 563f77595e1c..72cd76f73761 100644 --- a/src/py/flwr/server/superlink/fleet/vce/__init__.py +++ b/src/py/flwr/server/superlink/fleet/vce/__init__.py @@ -13,3 +13,9 @@ # limitations under the License. # ============================================================================== """Fleet VirtualClientEngine side.""" + +from .vce_api import start_vce + +__all__ = [ + "start_vce", +] diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index 9357693a0e87..8c76b401b915 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -42,9 +42,9 @@ def _register_nodes( # pylint: disable=too-many-arguments,unused-argument def start_vce( num_supernodes: int, - client_app_str: str, - backend_str: str, - backend_config_json_str: str, + client_app_module_name: str, + backend_name: str, + backend_config_json_stream: str, state_factory: StateFactory, working_dir: str, ) -> None: @@ -60,12 +60,12 @@ def start_vce( node_states[node_id] = NodeState() # Load backend config - _ = json.loads(backend_config_json_str) + _ = json.loads(backend_config_json_stream) - log(INFO, "client_app_str = %s", client_app_str) + log(INFO, "client_app_str = %s", client_app_module_name) def _load() -> ClientApp: - app: ClientApp = load_client_app(client_app_str) + app: ClientApp = load_client_app(client_app_module_name) return app # start backend From 5401cff4ea228e6f80b6092a43e31c8609a1dd48 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Fri, 23 Feb 2024 15:00:02 +0100 Subject: [PATCH 16/20] Update src/py/flwr/server/superlink/fleet/vce/backend/__init__.py --- src/py/flwr/server/superlink/fleet/vce/backend/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py index 305cb32c16e5..98e7a2758f2e 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""VirtualClientEngine Backends.""" +"""Simulation Engine Backends.""" from .backend import Backend, BackendConfig From c4a659bb4cfc1c383ad9e0b3b64bb77275d9a6ad Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Fri, 23 Feb 2024 15:00:07 +0100 Subject: [PATCH 17/20] Update src/py/flwr/server/superlink/fleet/vce/backend/backend.py --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 9b7cc18f3c08..d85bc4c2b543 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Generic Backend class for Fleet API using the VCE.""" +"""Generic Backend class for Fleet API using the Simulation Engine.""" from abc import ABC, abstractmethod From 8b70316166987fbcca3be025bedb0e5b58c914a1 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Fri, 23 Feb 2024 15:00:12 +0100 Subject: [PATCH 18/20] Update src/py/flwr/server/superlink/fleet/vce/backend/backend.py --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index d85bc4c2b543..39fa001dd391 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -27,7 +27,7 @@ class Backend(ABC): - """Abstract base class for a Backend.""" + """Abstract base class for a Simulation Engine Backend.""" @abstractmethod async def build(self) -> None: From 277af9d67d90df6ee364ef8e85de5447614c506b Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Fri, 23 Feb 2024 15:00:16 +0100 Subject: [PATCH 19/20] Update src/py/flwr/server/superlink/fleet/vce/backend/backend.py --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index 39fa001dd391..d21c11f52c0a 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -33,8 +33,8 @@ class Backend(ABC): async def build(self) -> None: """Build backend asynchronously. - Different components need to be inplace before workers in a backend are ready to - accept jobs. When this method finish executing, the backend should be fully + Different components need to be in place before workers in a backend are ready to + accept jobs. When this method finishes executing, the backend should be fully ready to run jobs. """ From 1ac487ea38479835d3f6dcfdde46eadd457895b7 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Fri, 23 Feb 2024 14:53:03 +0000 Subject: [PATCH 20/20] format --- src/py/flwr/server/superlink/fleet/vce/backend/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py index d21c11f52c0a..b6cddf2eefa5 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/backend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/backend.py @@ -33,8 +33,8 @@ class Backend(ABC): async def build(self) -> None: """Build backend asynchronously. - Different components need to be in place before workers in a backend are ready to - accept jobs. When this method finishes executing, the backend should be fully + Different components need to be in place before workers in a backend are ready + to accept jobs. When this method finishes executing, the backend should be fully ready to run jobs. """