From 60df4e4aadad84269d0f0019a3d04e7bd94a278a Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Thu, 24 Oct 2024 19:59:29 +0100 Subject: [PATCH] break(framework) Allow `ExecServicer` to access `LinkState` and `Ffs` (#4352) --- src/py/flwr/server/app.py | 6 +- src/py/flwr/superexec/app.py | 141 +------------------- src/py/flwr/superexec/deployment.py | 59 ++++---- src/py/flwr/superexec/exec_grpc.py | 23 ++-- src/py/flwr/superexec/exec_servicer.py | 12 +- src/py/flwr/superexec/exec_servicer_test.py | 2 +- src/py/flwr/superexec/executor.py | 19 +++ src/py/flwr/superexec/simulation.py | 8 ++ 8 files changed, 95 insertions(+), 175 deletions(-) diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index 805affd50813..03ceb878f33b 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -64,7 +64,7 @@ ) from flwr.proto.grpcadapter_pb2_grpc import add_GrpcAdapterServicer_to_server from flwr.superexec.app import load_executor -from flwr.superexec.exec_grpc import run_superexec_api_grpc +from flwr.superexec.exec_grpc import run_exec_api_grpc from .client_manager import ClientManager from .history import History @@ -329,8 +329,10 @@ def run_superlink() -> None: raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}") # Start Exec API - exec_server: grpc.Server = run_superexec_api_grpc( + exec_server: grpc.Server = run_exec_api_grpc( address=exec_address, + state_factory=state_factory, + ffs_factory=ffs_factory, executor=load_executor(args), certificates=certificates, config=parse_config_args( diff --git a/src/py/flwr/superexec/app.py b/src/py/flwr/superexec/app.py index 4dcdfeefc4c9..b9ef88e3c05d 100644 --- a/src/py/flwr/superexec/app.py +++ b/src/py/flwr/superexec/app.py @@ -16,21 +16,11 @@ import argparse import sys -from logging import INFO, WARN -from pathlib import Path -from typing import Optional +from logging import INFO -import grpc - -from flwr.common import EventType, event, log -from flwr.common.address import parse_address -from flwr.common.config import parse_config_args -from flwr.common.constant import EXEC_API_DEFAULT_ADDRESS -from flwr.common.exit_handlers import register_exit_handlers -from flwr.common.logger import warn_deprecated_feature +from flwr.common import log from flwr.common.object_ref import load_app, validate -from .exec_grpc import run_superexec_api_grpc from .executor import Executor @@ -38,137 +28,12 @@ def run_superexec() -> None: """Run Flower SuperExec.""" log(INFO, "Starting Flower SuperExec") - warn_deprecated_feature( + sys.exit( "Manually launching the SuperExec is deprecated. Since `flwr 1.13.0` " "the executor service runs in the SuperLink. Launching it manually is not " "recommended." ) - event(EventType.RUN_SUPEREXEC_ENTER) - - args = _parse_args_run_superexec().parse_args() - - # Parse IP address - parsed_address = parse_address(args.address) - if not parsed_address: - sys.exit(f"SuperExec IP address ({args.address}) cannot be parsed.") - host, port, is_v6 = parsed_address - address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}" - - # Obtain certificates - certificates = _try_obtain_certificates(args) - - # Start SuperExec API - superexec_server: grpc.Server = run_superexec_api_grpc( - address=address, - executor=load_executor(args), - certificates=certificates, - config=parse_config_args( - [args.executor_config] if args.executor_config else args.executor_config - ), - ) - - grpc_servers = [superexec_server] - - # Graceful shutdown - register_exit_handlers( - event_type=EventType.RUN_SUPEREXEC_LEAVE, - grpc_servers=grpc_servers, - bckg_threads=None, - ) - - superexec_server.wait_for_termination() - - -def _parse_args_run_superexec() -> argparse.ArgumentParser: - """Parse command line arguments for SuperExec.""" - parser = argparse.ArgumentParser( - description="Start a Flower SuperExec", - ) - parser.add_argument( - "--address", - help="SuperExec (gRPC) server address (IPv4, IPv6, or a domain name)", - default=EXEC_API_DEFAULT_ADDRESS, - ) - parser.add_argument( - "--executor", - help="For example: `deployment:exec` or `project.package.module:wrapper.exec`.", - default="flwr.superexec.deployment:executor", - ) - parser.add_argument( - "--executor-dir", - help="The directory for the executor.", - default=".", - ) - parser.add_argument( - "--executor-config", - help="Key-value pairs for the executor config, separated by spaces. " - 'For example:\n\n`--executor-config \'superlink="superlink:9091" ' - 'root-certificates="certificates/superlink-ca.crt"\'`', - ) - parser.add_argument( - "--insecure", - action="store_true", - help="Run the SuperExec without HTTPS, regardless of whether certificate " - "paths are provided. By default, the server runs with HTTPS enabled. " - "Use this flag only if you understand the risks.", - ) - parser.add_argument( - "--ssl-certfile", - help="SuperExec server SSL certificate file (as a path str) " - "to create a secure connection.", - type=str, - default=None, - ) - parser.add_argument( - "--ssl-keyfile", - help="SuperExec server SSL private key file (as a path str) " - "to create a secure connection.", - type=str, - ) - parser.add_argument( - "--ssl-ca-certfile", - help="SuperExec server SSL CA certificate file (as a path str) " - "to create a secure connection.", - type=str, - ) - return parser - - -def _try_obtain_certificates( - args: argparse.Namespace, -) -> Optional[tuple[bytes, bytes, bytes]]: - # Obtain certificates - if args.insecure: - log(WARN, "Option `--insecure` was set. Starting insecure HTTP server.") - return None - # Check if certificates are provided - if args.ssl_certfile and args.ssl_keyfile and args.ssl_ca_certfile: - if not Path(args.ssl_ca_certfile).is_file(): - sys.exit("Path argument `--ssl-ca-certfile` does not point to a file.") - if not Path(args.ssl_certfile).is_file(): - sys.exit("Path argument `--ssl-certfile` does not point to a file.") - if not Path(args.ssl_keyfile).is_file(): - sys.exit("Path argument `--ssl-keyfile` does not point to a file.") - certificates = ( - Path(args.ssl_ca_certfile).read_bytes(), # CA certificate - Path(args.ssl_certfile).read_bytes(), # server certificate - Path(args.ssl_keyfile).read_bytes(), # server private key - ) - return certificates - if args.ssl_certfile or args.ssl_keyfile or args.ssl_ca_certfile: - sys.exit( - "You need to provide valid file paths to `--ssl-certfile`, " - "`--ssl-keyfile`, and `—-ssl-ca-certfile` to create a secure " - "connection in SuperExec server (gRPC-rere)." - ) - sys.exit( - "Certificates are required unless running in insecure mode. " - "Please provide certificate paths to `--ssl-certfile`, " - "`--ssl-keyfile`, and `—-ssl-ca-certfile` or run the server " - "in insecure mode using '--insecure' if you understand the risks." - ) - def load_executor( args: argparse.Namespace, diff --git a/src/py/flwr/superexec/deployment.py b/src/py/flwr/superexec/deployment.py index 331fd817228e..07a733e133bb 100644 --- a/src/py/flwr/superexec/deployment.py +++ b/src/py/flwr/superexec/deployment.py @@ -24,12 +24,11 @@ from flwr.cli.install import install_from_fab from flwr.common.constant import DRIVER_API_DEFAULT_ADDRESS -from flwr.common.grpc import create_channel from flwr.common.logger import log -from flwr.common.serde import fab_to_proto, user_config_to_proto from flwr.common.typing import Fab, UserConfig -from flwr.proto.driver_pb2_grpc import DriverStub -from flwr.proto.run_pb2 import CreateRunRequest # pylint: disable=E0611 +from flwr.server.superlink.ffs import Ffs +from flwr.server.superlink.ffs.ffs_factory import FfsFactory +from flwr.server.superlink.linkstate import LinkState, LinkStateFactory from .executor import Executor, RunTracker @@ -62,7 +61,30 @@ def __init__( self.root_certificates = root_certificates self.root_certificates_bytes = Path(root_certificates).read_bytes() self.flwr_dir = flwr_dir - self.stub: Optional[DriverStub] = None + self.linkstate_factory: Optional[LinkStateFactory] = None + self.ffs_factory: Optional[FfsFactory] = None + + @override + def initialize( + self, linkstate_factory: LinkStateFactory, ffs_factory: FfsFactory + ) -> None: + """Initialize the executor with the necessary factories.""" + self.linkstate_factory = linkstate_factory + self.ffs_factory = ffs_factory + + @property + def linkstate(self) -> LinkState: + """Return the LinkState.""" + if self.linkstate_factory is None: + raise RuntimeError("Executor is not initialized.") + return self.linkstate_factory.state() + + @property + def ffs(self) -> Ffs: + """Return the Flower File Storage (FFS).""" + if self.ffs_factory is None: + raise RuntimeError("Executor is not initialized.") + return self.ffs_factory.ffs() @override def set_config( @@ -101,32 +123,19 @@ def set_config( raise ValueError("The `flwr-dir` value should be of type `str`.") self.flwr_dir = str(flwr_dir) - def _connect(self) -> None: - if self.stub is not None: - return - channel = create_channel( - server_address=self.superlink, - insecure=(self.root_certificates_bytes is None), - root_certificates=self.root_certificates_bytes, - ) - self.stub = DriverStub(channel) - def _create_run( self, fab: Fab, override_config: UserConfig, ) -> int: - if self.stub is None: - self._connect() - - assert self.stub is not None + fab_hash = self.ffs.put(fab.content, {}) + if fab_hash != fab.hash_str: + raise RuntimeError( + f"FAB ({fab.hash_str}) hash from request doesn't match contents" + ) - req = CreateRunRequest( - fab=fab_to_proto(fab), - override_config=user_config_to_proto(override_config), - ) - res = self.stub.CreateRun(request=req) - return int(res.run_id) + run_id = self.linkstate.create_run(None, None, fab_hash, override_config) + return run_id @override def start_run( diff --git a/src/py/flwr/superexec/exec_grpc.py b/src/py/flwr/superexec/exec_grpc.py index 017395bc8002..f8c9722ba5ac 100644 --- a/src/py/flwr/superexec/exec_grpc.py +++ b/src/py/flwr/superexec/exec_grpc.py @@ -23,33 +23,40 @@ from flwr.common.logger import log from flwr.common.typing import UserConfig from flwr.proto.exec_pb2_grpc import add_ExecServicer_to_server +from flwr.server.superlink.ffs.ffs_factory import FfsFactory from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server +from flwr.server.superlink.linkstate import LinkStateFactory from .exec_servicer import ExecServicer from .executor import Executor -def run_superexec_api_grpc( +# pylint: disable-next=too-many-arguments, too-many-positional-arguments +def run_exec_api_grpc( address: str, executor: Executor, + state_factory: LinkStateFactory, + ffs_factory: FfsFactory, certificates: Optional[tuple[bytes, bytes, bytes]], config: UserConfig, ) -> grpc.Server: - """Run SuperExec API (gRPC, request-response).""" + """Run Exec API (gRPC, request-response).""" executor.set_config(config) exec_servicer: grpc.Server = ExecServicer( + linkstate_factory=state_factory, + ffs_factory=ffs_factory, executor=executor, ) - superexec_add_servicer_to_server_fn = add_ExecServicer_to_server - superexec_grpc_server = generic_create_grpc_server( - servicer_and_add_fn=(exec_servicer, superexec_add_servicer_to_server_fn), + exec_add_servicer_to_server_fn = add_ExecServicer_to_server + exec_grpc_server = generic_create_grpc_server( + servicer_and_add_fn=(exec_servicer, exec_add_servicer_to_server_fn), server_address=address, max_message_length=GRPC_MAX_MESSAGE_LENGTH, certificates=certificates, ) - log(INFO, "Starting Flower SuperExec gRPC server on %s", address) - superexec_grpc_server.start() + log(INFO, "Flower Deployment Engine: Starting Exec API on %s", address) + exec_grpc_server.start() - return superexec_grpc_server + return exec_grpc_server diff --git a/src/py/flwr/superexec/exec_servicer.py b/src/py/flwr/superexec/exec_servicer.py index ebb12b5ddbd2..14c1a3548047 100644 --- a/src/py/flwr/superexec/exec_servicer.py +++ b/src/py/flwr/superexec/exec_servicer.py @@ -34,6 +34,8 @@ StreamLogsRequest, StreamLogsResponse, ) +from flwr.server.superlink.ffs.ffs_factory import FfsFactory +from flwr.server.superlink.linkstate import LinkStateFactory from .executor import Executor, RunTracker @@ -43,8 +45,16 @@ class ExecServicer(exec_pb2_grpc.ExecServicer): """SuperExec API servicer.""" - def __init__(self, executor: Executor) -> None: + def __init__( + self, + linkstate_factory: LinkStateFactory, + ffs_factory: FfsFactory, + executor: Executor, + ) -> None: + self.linkstate_factory = linkstate_factory + self.ffs_factory = ffs_factory self.executor = executor + self.executor.initialize(linkstate_factory, ffs_factory) self.runs: dict[int, RunTracker] = {} def StartRun( diff --git a/src/py/flwr/superexec/exec_servicer_test.py b/src/py/flwr/superexec/exec_servicer_test.py index b777bc806fe5..6044895de3cf 100644 --- a/src/py/flwr/superexec/exec_servicer_test.py +++ b/src/py/flwr/superexec/exec_servicer_test.py @@ -44,7 +44,7 @@ def test_start_run() -> None: request.fab.content = b"test" # Create a instance of FlowerServiceServicer - servicer = ExecServicer(executor=executor) + servicer = ExecServicer(Mock(), Mock(), executor=executor) # Execute response = servicer.StartRun(request, context_mock) diff --git a/src/py/flwr/superexec/executor.py b/src/py/flwr/superexec/executor.py index 08b66a438e4d..a36e1dec0fd2 100644 --- a/src/py/flwr/superexec/executor.py +++ b/src/py/flwr/superexec/executor.py @@ -20,6 +20,8 @@ from typing import Optional from flwr.common.typing import UserConfig +from flwr.server.superlink.ffs.ffs_factory import FfsFactory +from flwr.server.superlink.linkstate import LinkStateFactory @dataclass @@ -34,6 +36,23 @@ class RunTracker: class Executor(ABC): """Execute and monitor a Flower run.""" + @abstractmethod + def initialize( + self, linkstate_factory: LinkStateFactory, ffs_factory: FfsFactory + ) -> None: + """Initialize the executor with the necessary factories. + + This method sets up the executor by providing it with the factories required + to access the LinkState and the Flower File Storage (FFS) in the SuperLink. + + Parameters + ---------- + linkstate_factory : LinkStateFactory + The factory to create access to the LinkState. + ffs_factory : FfsFactory + The factory to create access to the Flower File Storage (FFS). + """ + @abstractmethod def set_config( self, diff --git a/src/py/flwr/superexec/simulation.py b/src/py/flwr/superexec/simulation.py index 820d80a89ac7..83ea0d0681a1 100644 --- a/src/py/flwr/superexec/simulation.py +++ b/src/py/flwr/superexec/simulation.py @@ -29,6 +29,8 @@ from flwr.common.constant import RUN_ID_NUM_BYTES from flwr.common.logger import log from flwr.common.typing import UserConfig +from flwr.server.superlink.ffs.ffs_factory import FfsFactory +from flwr.server.superlink.linkstate import LinkStateFactory from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes from .executor import Executor, RunTracker @@ -70,6 +72,12 @@ def __init__( self.num_supernodes = num_supernodes self.verbose = verbose + @override + def initialize( + self, linkstate_factory: LinkStateFactory, ffs_factory: FfsFactory + ) -> None: + """Initialize the executor with the necessary factories.""" + @override def set_config( self,