Skip to content

Commit

Permalink
Move run_client_app to supernode.app (#3352)
Browse files Browse the repository at this point in the history
  • Loading branch information
panh99 authored Apr 29, 2024
1 parent 84224e6 commit 20098cb
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 137 deletions.
2 changes: 1 addition & 1 deletion src/py/flwr/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
"""Flower client."""


from .app import run_client_app as run_client_app
from .app import start_client as start_client
from .app import start_numpy_client as start_numpy_client
from .client import Client as Client
from .client_app import ClientApp as ClientApp
from .numpy_client import NumPyClient as NumPyClient
from .supernode import run_client_app as run_client_app
from .supernode import run_supernode as run_supernode
from .typing import ClientFn as ClientFn

Expand Down
132 changes: 0 additions & 132 deletions src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@
# ==============================================================================
"""Flower client app."""

import argparse
import sys
import time
from logging import DEBUG, ERROR, INFO, WARN
from pathlib import Path
from typing import Callable, ContextManager, Optional, Tuple, Type, Union

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import (
load_ssh_private_key,
load_ssh_public_key,
)
from grpc import RpcError

from flwr.client.client import Client
Expand All @@ -41,141 +35,15 @@
TRANSPORT_TYPES,
ErrorCode,
)
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import log, warn_deprecated_feature
from flwr.common.message import Error
from flwr.common.object_ref import load_app, validate
from flwr.common.retry_invoker import RetryInvoker, exponential
from flwr.common.secure_aggregation.crypto.symmetric_encryption import (
ssh_types_to_elliptic_curve,
)

from .grpc_client.connection import grpc_connection
from .grpc_rere_client.connection import grpc_request_response
from .message_handler.message_handler import handle_control_message
from .node_state import NodeState
from .numpy_client import NumPyClient
from .supernode.app import parse_args_run_client_app


def run_client_app() -> None:
"""Run Flower client app."""
log(INFO, "Long-running Flower client starting")

event(EventType.RUN_CLIENT_APP_ENTER)

args = _parse_args_run_client_app().parse_args()

# Obtain certificates
if args.insecure:
if args.root_certificates is not None:
sys.exit(
"Conflicting options: The '--insecure' flag disables HTTPS, "
"but '--root-certificates' was also specified. Please remove "
"the '--root-certificates' option when running in insecure mode, "
"or omit '--insecure' to use HTTPS."
)
log(
WARN,
"Option `--insecure` was set. "
"Starting insecure HTTP client connected to %s.",
args.server,
)
root_certificates = None
else:
# Load the certificates if provided, or load the system certificates
cert_path = args.root_certificates
if cert_path is None:
root_certificates = None
else:
root_certificates = Path(cert_path).read_bytes()
log(
DEBUG,
"Starting secure HTTPS client connected to %s "
"with the following certificates: %s.",
args.server,
cert_path,
)

log(
DEBUG,
"Flower will load ClientApp `%s`",
getattr(args, "client-app"),
)

client_app_dir = args.dir
if client_app_dir is not None:
sys.path.insert(0, client_app_dir)

app_ref: str = getattr(args, "client-app")
valid, error_msg = validate(app_ref)
if not valid and error_msg:
raise LoadClientAppError(error_msg) from None

def _load() -> ClientApp:
client_app = load_app(app_ref, LoadClientAppError)

if not isinstance(client_app, ClientApp):
raise LoadClientAppError(
f"Attribute {app_ref} is not of type {ClientApp}",
) from None

return client_app

authentication_keys = _try_setup_client_authentication(args)

_start_client_internal(
server_address=args.server,
load_client_app_fn=_load,
transport="rest" if args.rest else "grpc-rere",
root_certificates=root_certificates,
insecure=args.insecure,
authentication_keys=authentication_keys,
max_retries=args.max_retries,
max_wait_time=args.max_wait_time,
)
register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE)


def _try_setup_client_authentication(
args: argparse.Namespace,
) -> Optional[Tuple[ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey]]:
if not args.authentication_keys:
return None

ssh_private_key = load_ssh_private_key(
Path(args.authentication_keys[0]).read_bytes(),
None,
)
ssh_public_key = load_ssh_public_key(Path(args.authentication_keys[1]).read_bytes())

try:
client_private_key, client_public_key = ssh_types_to_elliptic_curve(
ssh_private_key, ssh_public_key
)
except TypeError:
sys.exit(
"The file paths provided could not be read as a private and public "
"key pair. Client authentication requires an elliptic curve public and "
"private key pair. Please provide the file paths containing elliptic "
"curve private and public keys to '--authentication-keys'."
)

return (
client_private_key,
client_public_key,
)


def _parse_args_run_client_app() -> argparse.ArgumentParser:
"""Parse flower-client-app command line arguments."""
parser = argparse.ArgumentParser(
description="Start a Flower client app",
)

parse_args_run_client_app(parser=parser)

return parser


def _check_actionable_client(
Expand Down
2 changes: 2 additions & 0 deletions src/py/flwr/client/supernode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
"""Flower SuperNode."""


from .app import run_client_app as run_client_app
from .app import run_supernode as run_supernode

__all__ = [
"run_client_app",
"run_supernode",
]
162 changes: 158 additions & 4 deletions src/py/flwr/client/supernode/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@
"""Flower SuperNode."""

import argparse
from logging import DEBUG, INFO
import sys
from logging import DEBUG, INFO, WARN
from pathlib import Path
from typing import Callable, Optional, Tuple

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import (
load_ssh_private_key,
load_ssh_public_key,
)

from flwr.client.client_app import ClientApp, LoadClientAppError
from flwr.common import EventType, event
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import log
from flwr.common.object_ref import load_app, validate
from flwr.common.secure_aggregation.crypto.symmetric_encryption import (
ssh_types_to_elliptic_curve,
)

from ..app import _start_client_internal


def run_supernode() -> None:
Expand All @@ -41,6 +57,97 @@ def run_supernode() -> None:
)


def run_client_app() -> None:
"""Run Flower client app."""
log(INFO, "Long-running Flower client starting")

event(EventType.RUN_CLIENT_APP_ENTER)

args = _parse_args_run_client_app().parse_args()

root_certificates = _get_certificates(args)
log(
DEBUG,
"Flower will load ClientApp `%s`",
getattr(args, "client-app"),
)
load_fn = _get_load_client_app_fn(args)
authentication_keys = _try_setup_client_authentication(args)

_start_client_internal(
server_address=args.server,
load_client_app_fn=load_fn,
transport="rest" if args.rest else "grpc-rere",
root_certificates=root_certificates,
insecure=args.insecure,
authentication_keys=authentication_keys,
max_retries=args.max_retries,
max_wait_time=args.max_wait_time,
)
register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE)


def _get_certificates(args: argparse.Namespace) -> Optional[bytes]:
"""Load certificates if specified in args."""
# Obtain certificates
if args.insecure:
if args.root_certificates is not None:
sys.exit(
"Conflicting options: The '--insecure' flag disables HTTPS, "
"but '--root-certificates' was also specified. Please remove "
"the '--root-certificates' option when running in insecure mode, "
"or omit '--insecure' to use HTTPS."
)
log(
WARN,
"Option `--insecure` was set. "
"Starting insecure HTTP client connected to %s.",
args.server,
)
root_certificates = None
else:
# Load the certificates if provided, or load the system certificates
cert_path = args.root_certificates
if cert_path is None:
root_certificates = None
else:
root_certificates = Path(cert_path).read_bytes()
log(
DEBUG,
"Starting secure HTTPS client connected to %s "
"with the following certificates: %s.",
args.server,
cert_path,
)
return root_certificates


def _get_load_client_app_fn(
args: argparse.Namespace,
) -> Callable[[], ClientApp]:
"""Get the load_client_app_fn function."""
client_app_dir = args.dir
if client_app_dir is not None:
sys.path.insert(0, client_app_dir)

app_ref: str = getattr(args, "client-app")
valid, error_msg = validate(app_ref)
if not valid and error_msg:
raise LoadClientAppError(error_msg) from None

def _load() -> ClientApp:
client_app = load_app(app_ref, LoadClientAppError)

if not isinstance(client_app, ClientApp):
raise LoadClientAppError(
f"Attribute {app_ref} is not of type {ClientApp}",
) from None

return client_app

return _load


def _parse_args_run_supernode() -> argparse.ArgumentParser:
"""Parse flower-supernode command line arguments."""
parser = argparse.ArgumentParser(
Expand All @@ -57,17 +164,34 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser:
"If not provided, defaults to an empty string.",
)
_parse_args_common(parser)
parser.add_argument(
"--flwr-dir",
default=None,
help="""The path containing installed Flower Apps.
By default, this value isequal to:
- `$FLWR_HOME/` if `$FLWR_HOME` is defined
- `$XDG_DATA_HOME/.flwr/` if `$XDG_DATA_HOME` is defined
- `$HOME/.flwr/` in all other cases
""",
)

return parser


def parse_args_run_client_app(parser: argparse.ArgumentParser) -> None:
"""Parse command line arguments."""
def _parse_args_run_client_app() -> argparse.ArgumentParser:
"""Parse flower-client-app command line arguments."""
parser = argparse.ArgumentParser(
description="Start a Flower client app",
)

parser.add_argument(
"client-app",
help="For example: `client:app` or `project.package.module:wrapper.app`",
)
_parse_args_common(parser)
_parse_args_common(parser=parser)

return parser


def _parse_args_common(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -125,3 +249,33 @@ def _parse_args_common(parser: argparse.ArgumentParser) -> None:
help="Provide two file paths: (1) the client's private "
"key file, and (2) the client's public key file.",
)


def _try_setup_client_authentication(
args: argparse.Namespace,
) -> Optional[Tuple[ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey]]:
if not args.authentication_keys:
return None

ssh_private_key = load_ssh_private_key(
Path(args.authentication_keys[0]).read_bytes(),
None,
)
ssh_public_key = load_ssh_public_key(Path(args.authentication_keys[1]).read_bytes())

try:
client_private_key, client_public_key = ssh_types_to_elliptic_curve(
ssh_private_key, ssh_public_key
)
except TypeError:
sys.exit(
"The file paths provided could not be read as a private and public "
"key pair. Client authentication requires an elliptic curve public and "
"private key pair. Please provide the file paths containing elliptic "
"curve private and public keys to '--authentication-keys'."
)

return (
client_private_key,
client_public_key,
)

0 comments on commit 20098cb

Please sign in to comment.