From dc1f710001a5f0c0ebfe1cefe8c9772b24c463fc Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 8 Aug 2024 11:50:16 +0100 Subject: [PATCH 01/24] Initial commit --- pyproject.toml | 1 + src/py/flwr/client/supernode/__init__.py | 2 ++ src/py/flwr/client/supernode/app.py | 35 ++++++++++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 7a176d4d87f7..52a7274d4bd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ flower-supernode = "flwr.client:run_supernode" flower-client-app = "flwr.client:run_client_app" flower-server-app = "flwr.server:run_server_app" flower-simulation = "flwr.simulation.run_simulation:run_simulation_from_cli" +flower-exec-client-app = "flwr.client.supernode:exec_client_app" [tool.poetry.dependencies] python = "^3.8" diff --git a/src/py/flwr/client/supernode/__init__.py b/src/py/flwr/client/supernode/__init__.py index bc505f693875..8ab13f521dcf 100644 --- a/src/py/flwr/client/supernode/__init__.py +++ b/src/py/flwr/client/supernode/__init__.py @@ -15,10 +15,12 @@ """Flower SuperNode.""" +from .app import exec_client_app as exec_client_app from .app import run_client_app as run_client_app from .app import run_supernode as run_supernode __all__ = [ + "exec_client_app", "run_client_app", "run_supernode", ] diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 2c60f803f960..80faa73a78f3 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -119,6 +119,17 @@ def run_client_app() -> None: register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) +def exec_client_app() -> None: + """Run process-isolated Flower client app.""" + log(INFO, "Starting Flower ClientApp") + + event(EventType.RUN_CLIENT_APP_ENTER) + + # args = _parse_args_exec_client_app().parse_args() + + register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) + + def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: """Warn about the deprecated argument `--server`.""" if args.server != ADDRESS_FLEET_API_GRPC_RERE: @@ -275,6 +286,13 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser: - `$HOME/.flwr/` in all other cases """, ) + parser.add_argument( + "--isolate", + action="store_true", + help="Run the ClientApp in an isolated process from the SuperNode." + "In this mode, the ClientApp and SuperNode communicate via gRPC." + "By default, both SuperNode and ClientApp run in the same process.", + ) return parser @@ -294,6 +312,23 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: return parser +def _parse_args_exec_client_app() -> argparse.ArgumentParser: + """Parse exec-client-app command line arguments.""" + parser = argparse.ArgumentParser( + description="Run a Flower client app", + ) + parser.add_argument( + "--address", + help="Address of SuperNode", + ) + parser.add_argument( + "--token", + help="Unique token generated by SuperNode for each client app execution", + ) + + return parser + + def _parse_args_common(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--insecure", From aef1b3ef6cbdc91b3411a1618bc1edec17016c67 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 8 Aug 2024 13:44:54 +0100 Subject: [PATCH 02/24] Init commit --- src/py/flwr/client/process/process.py | 118 ++++++++++++++++++++++++++ src/py/flwr/client/process/utils.py | 96 +++++++++++++++++++++ src/py/flwr/client/supernode/app.py | 91 ++------------------ 3 files changed, 221 insertions(+), 84 deletions(-) create mode 100644 src/py/flwr/client/process/process.py create mode 100644 src/py/flwr/client/process/utils.py diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py new file mode 100644 index 000000000000..7e93dee91177 --- /dev/null +++ b/src/py/flwr/client/process/process.py @@ -0,0 +1,118 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Flower background ClientApp.""" + +from logging import DEBUG, ERROR, INFO + +import grpc + +# from flwr.cli.install import install_from_fab +from flwr.client.client_app import ClientApp +from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel +from flwr.common.logger import log +from flwr.common.serde import ( + context_from_proto, + context_to_proto, + message_from_proto, + message_to_proto, + run_from_proto, +) + +# pylint: disable=E0401,E0611 +from flwr.proto.appio_pb2 import PullClientAppInputsRequest, PushClientAppOutputsRequest +from flwr.proto.appio_pb2_grpc import ClientAppIoStub, add_ClientAppIoServicer_to_server + +# pylint: disable=E0611 +from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server + +from .clientappio_servicer import ClientAppIoServicer +from .utils import _get_load_client_app_fn + + +def _run_background_client( # pylint: disable=R0914 + address: str, + token: int, +) -> None: + """Run background Flower ClientApp process.""" + + def on_channel_state_change(channel_connectivity: str) -> None: + """Log channel connectivity.""" + log(DEBUG, channel_connectivity) + + channel = create_channel( + server_address=address, + insecure=True, + ) + channel.subscribe(on_channel_state_change) + + try: + stub = ClientAppIoStub(channel) + + req = PullClientAppInputsRequest(token=token) + res = stub.PullClientAppInputs(req) + # fab_file = res.fab + run = run_from_proto(res.run) + message = message_from_proto(res.message) + context = context_from_proto(res.context) + # Ensures FAB is installed (default is Flower directory) + # install_from_fab( + # fab_file, None, True + # ) + load_client_app_fn = _get_load_client_app_fn( + default_app_ref="", + project_dir="", + multi_app=True, + flwr_dir=None, + ) + # print(f"FAB ID: {run.fab_id}, FAB version: {run.fab_version}") + client_app: ClientApp = load_client_app_fn( + run.fab_id, run.fab_version # To be optimized later + ) + # Execute ClientApp + reply_message = client_app(message=message, context=context) + + proto_message = message_to_proto(reply_message) + proto_context = context_to_proto(context) + req = PushClientAppOutputsRequest( + token=token, + message=proto_message, + context=proto_context, + ) + res = stub.PushClientAppOutputs(req) + except KeyboardInterrupt: + log(INFO, "Closing connection") + except grpc.RpcError as e: + log(ERROR, "GRPC error occurred: %s", str(e)) + finally: + channel.close() + + +def run_clientappio_api_grpc( + address: str = "0.0.0.0:9094", +) -> tuple[grpc.Server, grpc.Server]: + """Run ClientAppIo API (gRPC-rere).""" + clientappio_servicer: grpc.Server = ClientAppIoServicer() + clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server + clientappio_grpc_server = generic_create_grpc_server( + servicer_and_add_fn=( + clientappio_servicer, + clientappio_add_servicer_to_server_fn, + ), + server_address=address, + max_message_length=GRPC_MAX_MESSAGE_LENGTH, + ) + log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address) + clientappio_grpc_server.start() + return clientappio_servicer, clientappio_grpc_server diff --git a/src/py/flwr/client/process/utils.py b/src/py/flwr/client/process/utils.py new file mode 100644 index 000000000000..8135c5208791 --- /dev/null +++ b/src/py/flwr/client/process/utils.py @@ -0,0 +1,96 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Flower ClientApp loading utils.""" + +from logging import DEBUG, WARN +from pathlib import Path +from typing import Callable, Optional + +from flwr.client.client_app import ClientApp, LoadClientAppError +from flwr.common.config import get_flwr_dir, get_project_config, get_project_dir +from flwr.common.logger import log +from flwr.common.object_ref import load_app, validate + + +def _get_load_client_app_fn( + default_app_ref: str, + project_dir: str, + multi_app: bool, + flwr_dir: Optional[str] = None, +) -> Callable[[str, str], ClientApp]: + """Get the load_client_app_fn function. + If `multi_app` is True, this function loads the specified ClientApp + based on `fab_id` and `fab_version`. If `fab_id` is empty, a default + ClientApp will be loaded. + If `multi_app` is False, it ignores `fab_id` and `fab_version` and + loads a default ClientApp. + """ + if not multi_app: + log( + DEBUG, + "Flower SuperNode will load and validate ClientApp `%s`", + default_app_ref, + ) + + valid, error_msg = validate(default_app_ref, project_dir=project_dir) + if not valid and error_msg: + raise LoadClientAppError(error_msg) from None + + def _load(fab_id: str, fab_version: str) -> ClientApp: + runtime_project_dir = Path(project_dir).absolute() + # If multi-app feature is disabled + if not multi_app: + # Set app reference + client_app_ref = default_app_ref + # If multi-app feature is enabled but the fab id is not specified + elif fab_id == "": + if default_app_ref == "": + raise LoadClientAppError( + "Invalid FAB ID: The FAB ID is empty.", + ) from None + + log(WARN, "FAB ID is not provided; the default ClientApp will be loaded.") + + # Set app reference + client_app_ref = default_app_ref + # If multi-app feature is enabled + else: + try: + runtime_project_dir = get_project_dir( + fab_id, fab_version, get_flwr_dir(flwr_dir) + ) + config = get_project_config(runtime_project_dir) + except Exception as e: + raise LoadClientAppError("Failed to load ClientApp") from e + + # Set app reference + client_app_ref = config["tool"]["flwr"]["app"]["components"]["clientapp"] + + # Load ClientApp + log( + DEBUG, + "Loading ClientApp `%s`", + client_app_ref, + ) + client_app = load_app(client_app_ref, LoadClientAppError, runtime_project_dir) + + if not isinstance(client_app, ClientApp): + raise LoadClientAppError( + f"Attribute {client_app_ref} is not of type {ClientApp}", + ) from None + + return client_app + + return _load diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 80faa73a78f3..b22d3bec01bd 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -18,7 +18,7 @@ import sys from logging import DEBUG, INFO, WARN from pathlib import Path -from typing import Callable, Optional, Tuple +from typing import Optional, Tuple from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives.asymmetric import ec @@ -27,14 +27,8 @@ load_ssh_public_key, ) -from flwr.client.client_app import ClientApp, LoadClientAppError from flwr.common import EventType, event -from flwr.common.config import ( - get_flwr_dir, - get_project_config, - get_project_dir, - parse_config_args, -) +from flwr.common.config import get_flwr_dir, parse_config_args from flwr.common.constant import ( TRANSPORT_TYPE_GRPC_ADAPTER, TRANSPORT_TYPE_GRPC_RERE, @@ -42,9 +36,10 @@ ) from flwr.common.exit_handlers import register_exit_handlers from flwr.common.logger import log, warn_deprecated_feature -from flwr.common.object_ref import load_app, validate from ..app import _start_client_internal +from ..process.process import _run_background_client +from ..process.utils import _get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" @@ -125,7 +120,9 @@ def exec_client_app() -> None: event(EventType.RUN_CLIENT_APP_ENTER) - # args = _parse_args_exec_client_app().parse_args() + args = _parse_args_exec_client_app().parse_args() + + _run_background_client(address=args.address, token=int(args.token)) register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) @@ -185,80 +182,6 @@ def _get_certificates(args: argparse.Namespace) -> Optional[bytes]: return root_certificates -def _get_load_client_app_fn( - default_app_ref: str, - project_dir: str, - multi_app: bool, - flwr_dir: Optional[str] = None, -) -> Callable[[str, str], ClientApp]: - """Get the load_client_app_fn function. - - If `multi_app` is True, this function loads the specified ClientApp - based on `fab_id` and `fab_version`. If `fab_id` is empty, a default - ClientApp will be loaded. - - If `multi_app` is False, it ignores `fab_id` and `fab_version` and - loads a default ClientApp. - """ - if not multi_app: - log( - DEBUG, - "Flower SuperNode will load and validate ClientApp `%s`", - default_app_ref, - ) - - valid, error_msg = validate(default_app_ref, project_dir=project_dir) - if not valid and error_msg: - raise LoadClientAppError(error_msg) from None - - def _load(fab_id: str, fab_version: str) -> ClientApp: - runtime_project_dir = Path(project_dir).absolute() - # If multi-app feature is disabled - if not multi_app: - # Set app reference - client_app_ref = default_app_ref - # If multi-app feature is enabled but the fab id is not specified - elif fab_id == "": - if default_app_ref == "": - raise LoadClientAppError( - "Invalid FAB ID: The FAB ID is empty.", - ) from None - - log(WARN, "FAB ID is not provided; the default ClientApp will be loaded.") - - # Set app reference - client_app_ref = default_app_ref - # If multi-app feature is enabled - else: - try: - runtime_project_dir = get_project_dir( - fab_id, fab_version, get_flwr_dir(flwr_dir) - ) - config = get_project_config(runtime_project_dir) - except Exception as e: - raise LoadClientAppError("Failed to load ClientApp") from e - - # Set app reference - client_app_ref = config["tool"]["flwr"]["app"]["components"]["clientapp"] - - # Load ClientApp - log( - DEBUG, - "Loading ClientApp `%s`", - client_app_ref, - ) - client_app = load_app(client_app_ref, LoadClientAppError, runtime_project_dir) - - if not isinstance(client_app, ClientApp): - raise LoadClientAppError( - f"Attribute {client_app_ref} is not of type {ClientApp}", - ) from None - - return client_app - - return _load - - def _parse_args_run_supernode() -> argparse.ArgumentParser: """Parse flower-supernode command line arguments.""" parser = argparse.ArgumentParser( From eaed17b51a0dc479e9318fa35069f53f43d21eff Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 9 Aug 2024 10:20:01 +0100 Subject: [PATCH 03/24] Update internal command to flwr-clientapp --- pyproject.toml | 4 ++-- src/py/flwr/client/supernode/__init__.py | 4 ++-- src/py/flwr/client/supernode/app.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 52a7274d4bd6..00416de5f21b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ flower-supernode = "flwr.client:run_supernode" flower-client-app = "flwr.client:run_client_app" flower-server-app = "flwr.server:run_server_app" flower-simulation = "flwr.simulation.run_simulation:run_simulation_from_cli" -flower-exec-client-app = "flwr.client.supernode:exec_client_app" +flwr-clientapp = "flwr.client.supernode:flwr_clientapp" [tool.poetry.dependencies] python = "^3.8" @@ -70,7 +70,7 @@ protobuf = "^4.25.2" cryptography = "^42.0.4" pycryptodome = "^3.18.0" iterators = "^0.0.2" -typer = { version = "^0.9.0", extras=["all"] } +typer = { version = "^0.9.0", extras = ["all"] } tomli = "^2.0.1" tomli-w = "^1.0.0" pathspec = "^0.12.1" diff --git a/src/py/flwr/client/supernode/__init__.py b/src/py/flwr/client/supernode/__init__.py index 8ab13f521dcf..ffd7865ea716 100644 --- a/src/py/flwr/client/supernode/__init__.py +++ b/src/py/flwr/client/supernode/__init__.py @@ -15,12 +15,12 @@ """Flower SuperNode.""" -from .app import exec_client_app as exec_client_app from .app import run_client_app as run_client_app +from .app import run_clientapp as run_clientapp from .app import run_supernode as run_supernode __all__ = [ - "exec_client_app", + "run_clientapp", "run_client_app", "run_supernode", ] diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index f6128d312b78..ad407b86d4c7 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -120,13 +120,13 @@ def run_client_app() -> None: register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) -def exec_client_app() -> None: +def flwr_clientapp() -> None: """Run process-isolated Flower client app.""" log(INFO, "Starting Flower ClientApp") event(EventType.RUN_CLIENT_APP_ENTER) - # args = _parse_args_exec_client_app().parse_args() + # args = _parse_args_flwr_clientapp().parse_args() register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) @@ -327,8 +327,8 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: return parser -def _parse_args_exec_client_app() -> argparse.ArgumentParser: - """Parse exec-client-app command line arguments.""" +def _parse_args_flwr_clientapp() -> argparse.ArgumentParser: + """Parse run-clientapp command line arguments.""" parser = argparse.ArgumentParser( description="Run a Flower client app", ) From 4b320779df15bba1018c6b86f2a38c9b4884d349 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 9 Aug 2024 10:27:30 +0100 Subject: [PATCH 04/24] Update __init__.py --- src/py/flwr/client/supernode/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/supernode/__init__.py b/src/py/flwr/client/supernode/__init__.py index ffd7865ea716..128d0286d625 100644 --- a/src/py/flwr/client/supernode/__init__.py +++ b/src/py/flwr/client/supernode/__init__.py @@ -15,12 +15,12 @@ """Flower SuperNode.""" +from .app import flwr_clientapp as flwr_clientapp from .app import run_client_app as run_client_app -from .app import run_clientapp as run_clientapp from .app import run_supernode as run_supernode __all__ = [ - "run_clientapp", + "flwr_clientapp", "run_client_app", "run_supernode", ] From ed64df181df2157f53c7550960e6916ce7a93337 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Sun, 11 Aug 2024 01:56:55 +0200 Subject: [PATCH 05/24] Update src/py/flwr/client/supernode/app.py --- src/py/flwr/client/supernode/app.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index ad407b86d4c7..c8b7e23e100e 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -124,12 +124,6 @@ def flwr_clientapp() -> None: """Run process-isolated Flower client app.""" log(INFO, "Starting Flower ClientApp") - event(EventType.RUN_CLIENT_APP_ENTER) - - # args = _parse_args_flwr_clientapp().parse_args() - - register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) - def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: """Warn about the deprecated argument `--server`.""" From 1a059ee886d65ac301db3cf92f646f171dd2cf70 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Sun, 11 Aug 2024 01:58:07 +0200 Subject: [PATCH 06/24] Update src/py/flwr/client/supernode/app.py --- src/py/flwr/client/supernode/app.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index c8b7e23e100e..727facccdb19 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -291,9 +291,9 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser: parser.add_argument( "--isolate", action="store_true", - help="Run the ClientApp in an isolated process from the SuperNode." - "In this mode, the ClientApp and SuperNode communicate via gRPC." - "By default, both SuperNode and ClientApp run in the same process.", + help="Run `ClientApp` in an isolated process from the SuperNode." + "In this mode, `ClientApp` and SuperNode communicate via gRPC." + "By default, both SuperNode and `ClientApp` run in the same process.", ) return parser From e269e2e9791c40a4a94aa71dc3fed11823464a99 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Sun, 11 Aug 2024 21:49:00 +0100 Subject: [PATCH 07/24] Address comments --- src/py/flwr/client/supernode/app.py | 44 +++++++++++++---------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 727facccdb19..d049bf3684c1 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -124,6 +124,26 @@ def flwr_clientapp() -> None: """Run process-isolated Flower client app.""" log(INFO, "Starting Flower ClientApp") + parser = argparse.ArgumentParser( + description="Run a Flower client app", + ) + parser.add_argument( + "--address", + help="Address of SuperNode", + ) + parser.add_argument( + "--token", + help="Unique token generated by SuperNode for each client app execution", + ) + args = parser.parse_args() + log( + DEBUG, + "Staring isolated `ClientApp` connected to SuperNode at %s " + "with the following token %s.", + args.address, + args.token, + ) + def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: """Warn about the deprecated argument `--server`.""" @@ -288,13 +308,6 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser: - `$HOME/.flwr/` in all other cases """, ) - parser.add_argument( - "--isolate", - action="store_true", - help="Run `ClientApp` in an isolated process from the SuperNode." - "In this mode, `ClientApp` and SuperNode communicate via gRPC." - "By default, both SuperNode and `ClientApp` run in the same process.", - ) return parser @@ -321,23 +334,6 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: return parser -def _parse_args_flwr_clientapp() -> argparse.ArgumentParser: - """Parse run-clientapp command line arguments.""" - parser = argparse.ArgumentParser( - description="Run a Flower client app", - ) - parser.add_argument( - "--address", - help="Address of SuperNode", - ) - parser.add_argument( - "--token", - help="Unique token generated by SuperNode for each client app execution", - ) - - return parser - - def _parse_args_common(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--insecure", From 57d0613075db0dab24983779e9d73c52df5de63f Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 12 Aug 2024 11:05:05 +0100 Subject: [PATCH 08/24] Update PR --- src/py/flwr/client/process/process.py | 43 ++++++++++++++++++--------- src/py/flwr/client/supernode/app.py | 1 + 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 7e93dee91177..5e7eb069206c 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -18,7 +18,6 @@ import grpc -# from flwr.cli.install import install_from_fab from flwr.client.client_app import ClientApp from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel from flwr.common.logger import log @@ -31,8 +30,14 @@ ) # pylint: disable=E0401,E0611 -from flwr.proto.appio_pb2 import PullClientAppInputsRequest, PushClientAppOutputsRequest -from flwr.proto.appio_pb2_grpc import ClientAppIoStub, add_ClientAppIoServicer_to_server +from flwr.proto.clientappio_pb2 import ( + PullClientAppInputsRequest, + PushClientAppOutputsRequest, +) +from flwr.proto.clientappio_pb2_grpc import ( + ClientAppIoStub, + add_ClientAppIoServicer_to_server, +) # pylint: disable=E0611 from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server @@ -45,7 +50,15 @@ def _run_background_client( # pylint: disable=R0914 address: str, token: int, ) -> None: - """Run background Flower ClientApp process.""" + """Run background Flower ClientApp process. + + Parameters + ---------- + address : str + Address of SuperNode + token : int + Unique SuperNode token for ClientApp-SuperNode authentication + """ def on_channel_state_change(channel_connectivity: str) -> None: """Log channel connectivity.""" @@ -60,31 +73,33 @@ def on_channel_state_change(channel_connectivity: str) -> None: try: stub = ClientAppIoStub(channel) + # Pull Message, Context, and Run from SuperNode req = PullClientAppInputsRequest(token=token) res = stub.PullClientAppInputs(req) - # fab_file = res.fab run = run_from_proto(res.run) + + # Deserialize Message and Context message = message_from_proto(res.message) context = context_from_proto(res.context) - # Ensures FAB is installed (default is Flower directory) - # install_from_fab( - # fab_file, None, True - # ) + load_client_app_fn = _get_load_client_app_fn( default_app_ref="", - project_dir="", + app_path=None, multi_app=True, flwr_dir=None, ) - # print(f"FAB ID: {run.fab_id}, FAB version: {run.fab_version}") - client_app: ClientApp = load_client_app_fn( - run.fab_id, run.fab_version # To be optimized later - ) + + # Load ClientApp + client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version) + # Execute ClientApp reply_message = client_app(message=message, context=context) + # Serialize updated Message and Context proto_message = message_to_proto(reply_message) proto_context = context_to_proto(context) + + # Push Message and Context to SuperNode req = PushClientAppOutputsRequest( token=token, message=proto_message, diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 36f3c43d159e..8f066d60e9ee 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -137,6 +137,7 @@ def flwr_clientapp() -> None: args.address, args.token, ) + _run_background_client(address=args.address, token=int(args.token)) def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: From d6b5b5d75d6f1455b623ad75c862068c78c63765 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 12 Aug 2024 11:26:37 +0100 Subject: [PATCH 09/24] Address comments --- src/py/flwr/client/app.py | 23 ++++++++++++++++++++ src/py/flwr/client/process/process.py | 30 ++------------------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 526f26cb8cc3..134b010f2960 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -22,6 +22,7 @@ from pathlib import Path from typing import Callable, ContextManager, Dict, Optional, Tuple, Type, Union +import grpc from cryptography.hazmat.primitives.asymmetric import ec from grpc import RpcError @@ -43,6 +44,8 @@ from flwr.common.message import Error from flwr.common.retry_invoker import RetryInvoker, RetryState, exponential from flwr.common.typing import Run, UserConfig +from flwr.proto.clientappio_pb2_grpc import add_ClientAppIoServicer_to_server +from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server from .grpc_adapter_client.connection import grpc_adapter from .grpc_client.connection import grpc_connection @@ -50,6 +53,7 @@ from .message_handler.message_handler import handle_control_message from .node_state import NodeState from .numpy_client import NumPyClient +from .process.clientappio_servicer import ClientAppIoServicer def _check_actionable_client( @@ -666,3 +670,22 @@ def signal_handler(sig, frame): # type: ignore signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + + +def run_clientappio_api_grpc( + address: str = "0.0.0.0:9094", +) -> tuple[grpc.Server, grpc.Server]: + """Run ClientAppIo API (gRPC-rere).""" + clientappio_servicer: grpc.Server = ClientAppIoServicer() + clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server + clientappio_grpc_server = generic_create_grpc_server( + servicer_and_add_fn=( + clientappio_servicer, + clientappio_add_servicer_to_server_fn, + ), + server_address=address, + max_message_length=GRPC_MAX_MESSAGE_LENGTH, + ) + log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address) + clientappio_grpc_server.start() + return clientappio_servicer, clientappio_grpc_server diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 5e7eb069206c..513352732d6f 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -19,7 +19,7 @@ import grpc from flwr.client.client_app import ClientApp -from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel +from flwr.common.grpc import create_channel from flwr.common.logger import log from flwr.common.serde import ( context_from_proto, @@ -34,15 +34,8 @@ PullClientAppInputsRequest, PushClientAppOutputsRequest, ) -from flwr.proto.clientappio_pb2_grpc import ( - ClientAppIoStub, - add_ClientAppIoServicer_to_server, -) - -# pylint: disable=E0611 -from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server +from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub -from .clientappio_servicer import ClientAppIoServicer from .utils import _get_load_client_app_fn @@ -112,22 +105,3 @@ def on_channel_state_change(channel_connectivity: str) -> None: log(ERROR, "GRPC error occurred: %s", str(e)) finally: channel.close() - - -def run_clientappio_api_grpc( - address: str = "0.0.0.0:9094", -) -> tuple[grpc.Server, grpc.Server]: - """Run ClientAppIo API (gRPC-rere).""" - clientappio_servicer: grpc.Server = ClientAppIoServicer() - clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server - clientappio_grpc_server = generic_create_grpc_server( - servicer_and_add_fn=( - clientappio_servicer, - clientappio_add_servicer_to_server_fn, - ), - server_address=address, - max_message_length=GRPC_MAX_MESSAGE_LENGTH, - ) - log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address) - clientappio_grpc_server.start() - return clientappio_servicer, clientappio_grpc_server From 5d30a7cff098970d81f7473a775791c7839685cd Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Tue, 13 Aug 2024 12:05:09 +0100 Subject: [PATCH 10/24] Refactor code for tests --- src/py/flwr/client/process/process.py | 48 +++++++++++++++++---------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 513352732d6f..8afb06df798d 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -19,6 +19,7 @@ import grpc from flwr.client.client_app import ClientApp +from flwr.common import Context, Message from flwr.common.grpc import create_channel from flwr.common.logger import log from flwr.common.serde import ( @@ -28,11 +29,14 @@ message_to_proto, run_from_proto, ) +from flwr.common.typing import Run # pylint: disable=E0401,E0611 from flwr.proto.clientappio_pb2 import ( PullClientAppInputsRequest, + PullClientAppInputsResponse, PushClientAppOutputsRequest, + PushClientAppOutputsResponse, ) from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub @@ -67,13 +71,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: stub = ClientAppIoStub(channel) # Pull Message, Context, and Run from SuperNode - req = PullClientAppInputsRequest(token=token) - res = stub.PullClientAppInputs(req) - run = run_from_proto(res.run) - - # Deserialize Message and Context - message = message_from_proto(res.message) - context = context_from_proto(res.context) + run, message, context = pull_message(stub=stub, token=token) load_client_app_fn = _get_load_client_app_fn( default_app_ref="", @@ -88,20 +86,36 @@ def on_channel_state_change(channel_connectivity: str) -> None: # Execute ClientApp reply_message = client_app(message=message, context=context) - # Serialize updated Message and Context - proto_message = message_to_proto(reply_message) - proto_context = context_to_proto(context) - # Push Message and Context to SuperNode - req = PushClientAppOutputsRequest( - token=token, - message=proto_message, - context=proto_context, - ) - res = stub.PushClientAppOutputs(req) + _ = push_message(token=token, message=reply_message, context=context, stub=stub) except KeyboardInterrupt: log(INFO, "Closing connection") except grpc.RpcError as e: log(ERROR, "GRPC error occurred: %s", str(e)) finally: channel.close() + + +def pull_message(stub: grpc.Channel, token: int) -> tuple[Run, Message, Context]: + """.""" + res: PullClientAppInputsResponse = stub.PullClientAppInputs( + PullClientAppInputsRequest(token=token) + ) + run = run_from_proto(res.run) + message = message_from_proto(res.message) + context = context_from_proto(res.context) + return run, message, context + + +def push_message( + stub: grpc.Channel, token: int, message: Message, context: Context +) -> PushClientAppOutputsResponse: + """.""" + proto_message = message_to_proto(message) + proto_context = context_to_proto(context) + res: PushClientAppOutputsResponse = stub.PushClientAppOutputs( + PushClientAppOutputsRequest( + token=token, message=proto_message, context=proto_context + ) + ) + return res From 18f04e0623f2c59762d456ee3d90b90f0de29da6 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Tue, 13 Aug 2024 12:06:09 +0100 Subject: [PATCH 11/24] Update docstring --- src/py/flwr/client/process/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 8afb06df798d..f501ee1f61b4 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -97,7 +97,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: def pull_message(stub: grpc.Channel, token: int) -> tuple[Run, Message, Context]: - """.""" + """Pull message from SuperNode to ClientApp.""" res: PullClientAppInputsResponse = stub.PullClientAppInputs( PullClientAppInputsRequest(token=token) ) @@ -110,7 +110,7 @@ def pull_message(stub: grpc.Channel, token: int) -> tuple[Run, Message, Context] def push_message( stub: grpc.Channel, token: int, message: Message, context: Context ) -> PushClientAppOutputsResponse: - """.""" + """Push message to SuperNode from ClientApp.""" proto_message = message_to_proto(message) proto_context = context_to_proto(context) res: PushClientAppOutputsResponse = stub.PushClientAppOutputs( From 81c2a83355653ba704fa822811e995a02496a1f7 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 16:53:16 +0100 Subject: [PATCH 12/24] Fix missing import --- src/py/flwr/client/process/utils.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/process/utils.py b/src/py/flwr/client/process/utils.py index 1e39b88c7e94..e52eba93a92b 100644 --- a/src/py/flwr/client/process/utils.py +++ b/src/py/flwr/client/process/utils.py @@ -14,12 +14,17 @@ # ============================================================================== """Flower ClientApp loading utils.""" -from logging import DEBUG, WARN +from logging import DEBUG from pathlib import Path from typing import Callable, Optional from flwr.client.client_app import ClientApp, LoadClientAppError -from flwr.common.config import get_flwr_dir, get_project_config, get_project_dir +from flwr.common.config import ( + get_flwr_dir, + get_metadata_from_config, + get_project_config, + get_project_dir, +) from flwr.common.logger import log from flwr.common.object_ref import load_app, validate From 3ae8021b34314627bc7afac9e123e70295802016 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 16:56:14 +0100 Subject: [PATCH 13/24] Fix return type --- src/py/flwr/client/process/process.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index f501ee1f61b4..5c7de7c0015e 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -15,6 +15,7 @@ """Flower background ClientApp.""" from logging import DEBUG, ERROR, INFO +from typing import Tuple import grpc @@ -31,7 +32,7 @@ ) from flwr.common.typing import Run -# pylint: disable=E0401,E0611 +# pylint: disable=E0611 from flwr.proto.clientappio_pb2 import ( PullClientAppInputsRequest, PullClientAppInputsResponse, @@ -96,7 +97,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: channel.close() -def pull_message(stub: grpc.Channel, token: int) -> tuple[Run, Message, Context]: +def pull_message(stub: grpc.Channel, token: int) -> Tuple[Run, Message, Context]: """Pull message from SuperNode to ClientApp.""" res: PullClientAppInputsResponse = stub.PullClientAppInputs( PullClientAppInputsRequest(token=token) From 91cce4ebbe034b2f3543accdf80650b906051e18 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 17:00:29 +0100 Subject: [PATCH 14/24] Change import --- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index a8d02802a8b1..1df821d0f495 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -28,7 +28,7 @@ from flwr.client.client_app import ClientApp, ClientAppException, LoadClientAppError from flwr.client.node_state import NodeState -from flwr.client.supernode.app import _get_load_client_app_fn +from flwr.client.process.utils import _get_load_client_app_fn from flwr.common.constant import ( NUM_PARTITIONS_KEY, PARTITION_ID_KEY, From 1a48975da961aaa43fa80246ed0de65ee92cc66b Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 17:04:56 +0100 Subject: [PATCH 15/24] fix --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index df8f268a3680..53f443f5dbfa 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -675,7 +675,7 @@ def signal_handler(sig, frame): # type: ignore def run_clientappio_api_grpc( address: str = "0.0.0.0:9094", -) -> tuple[grpc.Server, grpc.Server]: +) -> Tuple[grpc.Server, grpc.Server]: """Run ClientAppIo API (gRPC-rere).""" clientappio_servicer: grpc.Server = ClientAppIoServicer() clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server From cd3b888d347d3ec9e90a43f2cb292a5233d07653 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 17:07:21 +0100 Subject: [PATCH 16/24] Set address --- src/py/flwr/client/app.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 53f443f5dbfa..c60b0705d065 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -55,6 +55,8 @@ from .numpy_client import NumPyClient from .process.clientappio_servicer import ClientAppIoServicer +ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094" + def _check_actionable_client( client: Optional[Client], client_fn: Optional[ClientFnExt] @@ -674,7 +676,7 @@ def signal_handler(sig, frame): # type: ignore def run_clientappio_api_grpc( - address: str = "0.0.0.0:9094", + address: str = ADDRESS_CLIENTAPPIO_API_GRPC_RERE, ) -> Tuple[grpc.Server, grpc.Server]: """Run ClientAppIo API (gRPC-rere).""" clientappio_servicer: grpc.Server = ClientAppIoServicer() From ea3956abc5705187ffd732f9fba4e1978748a573 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Thu, 15 Aug 2024 20:19:32 +0100 Subject: [PATCH 17/24] Reorder --- src/py/flwr/client/process/process.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 5c7de7c0015e..2e1cb50818e6 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -72,7 +72,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: stub = ClientAppIoStub(channel) # Pull Message, Context, and Run from SuperNode - run, message, context = pull_message(stub=stub, token=token) + message, context, run = pull_message(stub=stub, token=token) load_client_app_fn = _get_load_client_app_fn( default_app_ref="", @@ -97,15 +97,15 @@ def on_channel_state_change(channel_connectivity: str) -> None: channel.close() -def pull_message(stub: grpc.Channel, token: int) -> Tuple[Run, Message, Context]: +def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: """Pull message from SuperNode to ClientApp.""" res: PullClientAppInputsResponse = stub.PullClientAppInputs( PullClientAppInputsRequest(token=token) ) - run = run_from_proto(res.run) message = message_from_proto(res.message) context = context_from_proto(res.context) - return run, message, context + run = run_from_proto(res.run) + return message, context, run def push_message( From f6b9f8560cedbb092e22f1e524fdd5cb57e7a1f9 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Thu, 15 Aug 2024 23:12:15 +0200 Subject: [PATCH 18/24] Update src/py/flwr/client/app.py --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index c60b0705d065..b3bc9057b9ca 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -678,7 +678,7 @@ def signal_handler(sig, frame): # type: ignore def run_clientappio_api_grpc( address: str = ADDRESS_CLIENTAPPIO_API_GRPC_RERE, ) -> Tuple[grpc.Server, grpc.Server]: - """Run ClientAppIo API (gRPC-rere).""" + """Run ClientAppIo API gRPC server.""" clientappio_servicer: grpc.Server = ClientAppIoServicer() clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server clientappio_grpc_server = generic_create_grpc_server( From 2a157826ab67806ebbf4b1536ce6366b522514c3 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 08:43:57 +0100 Subject: [PATCH 19/24] Add exception handling --- src/py/flwr/client/app.py | 4 +-- src/py/flwr/client/process/process.py | 37 ++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index b3bc9057b9ca..e42c4d462fed 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -677,7 +677,7 @@ def signal_handler(sig, frame): # type: ignore def run_clientappio_api_grpc( address: str = ADDRESS_CLIENTAPPIO_API_GRPC_RERE, -) -> Tuple[grpc.Server, grpc.Server]: +) -> Tuple[grpc.Server, ClientAppIoServicer]: """Run ClientAppIo API gRPC server.""" clientappio_servicer: grpc.Server = ClientAppIoServicer() clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server @@ -691,4 +691,4 @@ def run_clientappio_api_grpc( ) log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address) clientappio_grpc_server.start() - return clientappio_servicer, clientappio_grpc_server + return clientappio_grpc_server, clientappio_servicer diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 2e1cb50818e6..74eac46977a8 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -19,10 +19,12 @@ import grpc -from flwr.client.client_app import ClientApp +from flwr.client.client_app import ClientApp, LoadClientAppError from flwr.common import Context, Message +from flwr.common.constant import ErrorCode from flwr.common.grpc import create_channel from flwr.common.logger import log +from flwr.common.message import Error from flwr.common.serde import ( context_from_proto, context_to_proto, @@ -48,7 +50,7 @@ def _run_background_client( # pylint: disable=R0914 address: str, token: int, ) -> None: - """Run background Flower ClientApp process. + """Run Flower ClientApp process. Parameters ---------- @@ -81,14 +83,35 @@ def on_channel_state_change(channel_connectivity: str) -> None: flwr_dir=None, ) - # Load ClientApp - client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version) + try: + # Load ClientApp + client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version) - # Execute ClientApp - reply_message = client_app(message=message, context=context) + # Execute ClientApp + reply_message = client_app(message=message, context=context) + except Exception as ex: # pylint: disable=broad-exception-caught + # Don't update/change NodeState + + e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION + # Ex fmt: ":<'division by zero'>" + reason = str(type(ex)) + ":<'" + str(ex) + "'>" + exc_entity = "ClientApp" + if isinstance(ex, LoadClientAppError): + reason = ( + "An exception was raised when attempting to load " "`ClientApp`" + ) + e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION + + log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) + + # Create error message + reply_message = message.create_error_reply( + error=Error(code=e_code, reason=reason) + ) # Push Message and Context to SuperNode - _ = push_message(token=token, message=reply_message, context=context, stub=stub) + _ = push_message(stub=stub, token=token, message=reply_message, context=context) + except KeyboardInterrupt: log(INFO, "Closing connection") except grpc.RpcError as e: From 42a0336676ae667617a0e5e35392c252437d22ae Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 08:46:03 +0100 Subject: [PATCH 20/24] Lint --- src/py/flwr/client/process/process.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 74eac46977a8..c324524fd147 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -97,9 +97,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: reason = str(type(ex)) + ":<'" + str(ex) + "'>" exc_entity = "ClientApp" if isinstance(ex, LoadClientAppError): - reason = ( - "An exception was raised when attempting to load " "`ClientApp`" - ) + reason = "An exception was raised when attempting to load `ClientApp`" e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) From 13f557fd974ea8bd551ff9f32e02604b40f9b123 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 09:30:52 +0100 Subject: [PATCH 21/24] add --- src/py/flwr/client/process/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index c324524fd147..45f5f9139961 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Flower background ClientApp.""" +"""Flower ClientApp process.""" from logging import DEBUG, ERROR, INFO from typing import Tuple @@ -46,7 +46,7 @@ from .utils import _get_load_client_app_fn -def _run_background_client( # pylint: disable=R0914 +def _run_clientapp( # pylint: disable=R0914 address: str, token: int, ) -> None: From 1d514aff96b2943e7b29042fbd6f2d59330e9743 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 10:12:54 +0100 Subject: [PATCH 22/24] Remove background from name --- src/py/flwr/client/supernode/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index ef2f15cb7207..b2cb1b5f033e 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -38,7 +38,7 @@ from flwr.common.logger import log, warn_deprecated_feature from ..app import _start_client_internal -from ..process.process import _run_background_client +from ..process.process import _run_clientapp from ..process.utils import _get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" @@ -137,7 +137,7 @@ def flwr_clientapp() -> None: args.address, args.token, ) - _run_background_client(address=args.address, token=int(args.token)) + _run_clientapp(address=args.address, token=int(args.token)) def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: From 08153ac8f918321b5725f4fb4d29e11c5a5ac32a Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 11:25:39 +0100 Subject: [PATCH 23/24] Move on_channel_state_change to top-level --- src/py/flwr/cli/run/run.py | 9 +++++---- src/py/flwr/client/process/process.py | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/py/flwr/cli/run/run.py b/src/py/flwr/cli/run/run.py index fe7b2f32a104..2df14969e24e 100644 --- a/src/py/flwr/cli/run/run.py +++ b/src/py/flwr/cli/run/run.py @@ -35,6 +35,11 @@ from flwr.proto.exec_pb2_grpc import ExecStub +def on_channel_state_change(channel_connectivity: str) -> None: + """Log channel connectivity.""" + log(DEBUG, channel_connectivity) + + # pylint: disable-next=too-many-locals def run( app: Annotated[ @@ -122,10 +127,6 @@ def _run_with_superexec( config_overrides: Optional[List[str]], ) -> None: - def on_channel_state_change(channel_connectivity: str) -> None: - """Log channel connectivity.""" - log(DEBUG, channel_connectivity) - insecure_str = federation_config.get("insecure") if root_certificates := federation_config.get("root-certificates"): root_certificates_bytes = Path(root_certificates).read_bytes() diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index 45f5f9139961..dbf6b5dd14e8 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -46,6 +46,11 @@ from .utils import _get_load_client_app_fn +def on_channel_state_change(channel_connectivity: str) -> None: + """Log channel connectivity.""" + log(DEBUG, channel_connectivity) + + def _run_clientapp( # pylint: disable=R0914 address: str, token: int, @@ -60,10 +65,6 @@ def _run_clientapp( # pylint: disable=R0914 Unique SuperNode token for ClientApp-SuperNode authentication """ - def on_channel_state_change(channel_connectivity: str) -> None: - """Log channel connectivity.""" - log(DEBUG, channel_connectivity) - channel = create_channel( server_address=address, insecure=True, From 7f13cdf697e696682f064f792dcd1d028073ff3d Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Fri, 16 Aug 2024 11:33:26 +0100 Subject: [PATCH 24/24] Lint --- src/py/flwr/client/process/process.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/process/process.py index dbf6b5dd14e8..a1841940823c 100644 --- a/src/py/flwr/client/process/process.py +++ b/src/py/flwr/client/process/process.py @@ -64,7 +64,6 @@ def _run_clientapp( # pylint: disable=R0914 token : int Unique SuperNode token for ClientApp-SuperNode authentication """ - channel = create_channel( server_address=address, insecure=True,