From 742e49886753b613c577b2117e4acab7be9da9f3 Mon Sep 17 00:00:00 2001 From: Daniel Nata Nugraha Date: Wed, 12 Jun 2024 16:04:47 +0200 Subject: [PATCH] break(framework) Remove `flower-driver-api` and `flower-fleet-api` (#3418) Co-authored-by: jafermarq Co-authored-by: Daniel J. Beutel --- pyproject.toml | 2 - src/py/flwr/server/__init__.py | 4 - src/py/flwr/server/app.py | 169 +-------------------------------- 3 files changed, 1 insertion(+), 174 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a1a21a6b94cf..62e17aeb5281 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,8 +53,6 @@ exclude = [ [tool.poetry.scripts] flwr = "flwr.cli.app:app" -flower-driver-api = "flwr.server:run_driver_api" -flower-fleet-api = "flwr.server:run_fleet_api" flower-superlink = "flwr.server:run_superlink" flower-supernode = "flwr.client:run_supernode" flower-client-app = "flwr.client:run_client_app" diff --git a/src/py/flwr/server/__init__.py b/src/py/flwr/server/__init__.py index 875f66c43d03..19c6034bcaa1 100644 --- a/src/py/flwr/server/__init__.py +++ b/src/py/flwr/server/__init__.py @@ -17,8 +17,6 @@ from . import strategy from . import workflow as workflow -from .app import run_driver_api as run_driver_api -from .app import run_fleet_api as run_fleet_api from .app import run_superlink as run_superlink from .app import start_server as start_server from .client_manager import ClientManager as ClientManager @@ -36,8 +34,6 @@ "Driver", "History", "LegacyContext", - "run_driver_api", - "run_fleet_api", "run_server_app", "run_superlink", "Server", diff --git a/src/py/flwr/server/app.py b/src/py/flwr/server/app.py index c050c983134b..cbb18b602fcd 100644 --- a/src/py/flwr/server/app.py +++ b/src/py/flwr/server/app.py @@ -40,7 +40,7 @@ TRANSPORT_TYPE_REST, ) from flwr.common.exit_handlers import register_exit_handlers -from flwr.common.logger import log, warn_deprecated_feature +from flwr.common.logger import log from flwr.common.secure_aggregation.crypto.symmetric_encryption import ( private_key_to_bytes, public_key_to_bytes, @@ -190,139 +190,6 @@ def start_server( # pylint: disable=too-many-arguments,too-many-locals return hist -def run_driver_api() -> None: - """Run Flower server (Driver API).""" - log(INFO, "Starting Flower server (Driver API)") - # Running `flower-driver-api` is deprecated - warn_deprecated_feature("flower-driver-api") - log(WARN, "Use `flower-superlink` instead") - event(EventType.RUN_DRIVER_API_ENTER) - args = _parse_args_run_driver_api().parse_args() - - # Parse IP address - parsed_address = parse_address(args.driver_api_address) - if not parsed_address: - sys.exit(f"Driver IP address ({args.driver_api_address}) cannot be parsed.") - host, port, is_v6 = parsed_address - address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}" - - # Obtain certificates - certificates = _try_obtain_certificates(args) - - # Initialize StateFactory - state_factory = StateFactory(args.database) - - # Start server - grpc_server: grpc.Server = run_driver_api_grpc( - address=address, - state_factory=state_factory, - certificates=certificates, - ) - - # Graceful shutdown - register_exit_handlers( - event_type=EventType.RUN_DRIVER_API_LEAVE, - grpc_servers=[grpc_server], - bckg_threads=[], - ) - - # Block - grpc_server.wait_for_termination() - - -# pylint: disable=too-many-locals -def run_fleet_api() -> None: - """Run Flower server (Fleet API).""" - log(INFO, "Starting Flower server (Fleet API)") - # Running `flower-fleet-api` is deprecated - warn_deprecated_feature("flower-fleet-api") - log(WARN, "Use `flower-superlink` instead") - event(EventType.RUN_FLEET_API_ENTER) - args = _parse_args_run_fleet_api().parse_args() - - # Obtain certificates - certificates = _try_obtain_certificates(args) - - # Initialize StateFactory - state_factory = StateFactory(args.database) - - grpc_servers = [] - bckg_threads = [] - - address_arg = args.fleet_api_address - parsed_address = parse_address(address_arg) - if not parsed_address: - sys.exit(f"Fleet IP address ({address_arg}) cannot be parsed.") - host, port, is_v6 = parsed_address - address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}" - - num_workers = args.fleet_api_num_workers - if num_workers != 1: - log( - WARN, - "The Fleet API currently supports only 1 worker. " - "You have specified %d workers. " - "Support for multiple workers will be added in future releases. " - "Proceeding with a single worker.", - args.fleet_api_num_workers, - ) - num_workers = 1 - - # Start Fleet API - if args.fleet_api_type == TRANSPORT_TYPE_REST: - if ( - importlib.util.find_spec("requests") - and importlib.util.find_spec("starlette") - and importlib.util.find_spec("uvicorn") - ) is None: - sys.exit(MISSING_EXTRA_REST) - - _, ssl_certfile, ssl_keyfile = ( - certificates if certificates is not None else (None, None, None) - ) - fleet_thread = threading.Thread( - target=_run_fleet_api_rest, - args=( - host, - port, - ssl_keyfile, - ssl_certfile, - state_factory, - num_workers, - ), - ) - fleet_thread.start() - bckg_threads.append(fleet_thread) - elif args.fleet_api_type == TRANSPORT_TYPE_GRPC_RERE: - address_arg = args.grpc_rere_fleet_api_address - parsed_address = parse_address(address_arg) - if not parsed_address: - sys.exit(f"Fleet IP address ({address_arg}) cannot be parsed.") - host, port, is_v6 = parsed_address - address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}" - fleet_server = _run_fleet_api_grpc_rere( - address=address, - state_factory=state_factory, - certificates=certificates, - ) - grpc_servers.append(fleet_server) - else: - raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}") - - # Graceful shutdown - register_exit_handlers( - event_type=EventType.RUN_FLEET_API_LEAVE, - grpc_servers=grpc_servers, - bckg_threads=bckg_threads, - ) - - # Block - if len(grpc_servers) > 0: - grpc_servers[0].wait_for_termination() - elif len(bckg_threads) > 0: - bckg_threads[0].join() - - # pylint: disable=too-many-branches, too-many-locals, too-many-statements def run_superlink() -> None: """Run Flower SuperLink (Driver API and Fleet API).""" @@ -661,40 +528,6 @@ def _run_fleet_api_rest( ) -def _parse_args_run_driver_api() -> argparse.ArgumentParser: - """Parse command line arguments for Driver API.""" - parser = argparse.ArgumentParser( - description="Start a Flower Driver API server. " - "This server will be responsible for " - "receiving TaskIns from the Driver script and " - "sending them to the Fleet API. Once the client nodes " - "are done, they will send the TaskRes back to this Driver API server (through" - " the Fleet API) which will then send them back to the Driver script.", - ) - - _add_args_common(parser=parser) - _add_args_driver_api(parser=parser) - - return parser - - -def _parse_args_run_fleet_api() -> argparse.ArgumentParser: - """Parse command line arguments for Fleet API.""" - parser = argparse.ArgumentParser( - description="Start a Flower Fleet API server." - "This server will be responsible for " - "sending TaskIns (received from the Driver API) to the client nodes " - "and of receiving TaskRes sent back from those same client nodes once " - "they are done. Then, this Fleet API server can send those " - "TaskRes back to the Driver API.", - ) - - _add_args_common(parser=parser) - _add_args_fleet_api(parser=parser) - - return parser - - def _parse_args_run_superlink() -> argparse.ArgumentParser: """Parse command line arguments for both Driver API and Fleet API.""" parser = argparse.ArgumentParser(