Skip to content

Commit

Permalink
break(framework) Remove flower-driver-api and flower-fleet-api (#…
Browse files Browse the repository at this point in the history
…3418)

Co-authored-by: jafermarq <[email protected]>
Co-authored-by: Daniel J. Beutel <[email protected]>
  • Loading branch information
3 people authored Jun 12, 2024
1 parent e54a33b commit 742e498
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 174 deletions.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ exclude = [

[tool.poetry.scripts]
flwr = "flwr.cli.app:app"
flower-driver-api = "flwr.server:run_driver_api"
flower-fleet-api = "flwr.server:run_fleet_api"
flower-superlink = "flwr.server:run_superlink"
flower-supernode = "flwr.client:run_supernode"
flower-client-app = "flwr.client:run_client_app"
Expand Down
4 changes: 0 additions & 4 deletions src/py/flwr/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from . import strategy
from . import workflow as workflow
from .app import run_driver_api as run_driver_api
from .app import run_fleet_api as run_fleet_api
from .app import run_superlink as run_superlink
from .app import start_server as start_server
from .client_manager import ClientManager as ClientManager
Expand All @@ -36,8 +34,6 @@
"Driver",
"History",
"LegacyContext",
"run_driver_api",
"run_fleet_api",
"run_server_app",
"run_superlink",
"Server",
Expand Down
169 changes: 1 addition & 168 deletions src/py/flwr/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
TRANSPORT_TYPE_REST,
)
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import log, warn_deprecated_feature
from flwr.common.logger import log
from flwr.common.secure_aggregation.crypto.symmetric_encryption import (
private_key_to_bytes,
public_key_to_bytes,
Expand Down Expand Up @@ -190,139 +190,6 @@ def start_server( # pylint: disable=too-many-arguments,too-many-locals
return hist


def run_driver_api() -> None:
"""Run Flower server (Driver API)."""
log(INFO, "Starting Flower server (Driver API)")
# Running `flower-driver-api` is deprecated
warn_deprecated_feature("flower-driver-api")
log(WARN, "Use `flower-superlink` instead")
event(EventType.RUN_DRIVER_API_ENTER)
args = _parse_args_run_driver_api().parse_args()

# Parse IP address
parsed_address = parse_address(args.driver_api_address)
if not parsed_address:
sys.exit(f"Driver IP address ({args.driver_api_address}) cannot be parsed.")
host, port, is_v6 = parsed_address
address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}"

# Obtain certificates
certificates = _try_obtain_certificates(args)

# Initialize StateFactory
state_factory = StateFactory(args.database)

# Start server
grpc_server: grpc.Server = run_driver_api_grpc(
address=address,
state_factory=state_factory,
certificates=certificates,
)

# Graceful shutdown
register_exit_handlers(
event_type=EventType.RUN_DRIVER_API_LEAVE,
grpc_servers=[grpc_server],
bckg_threads=[],
)

# Block
grpc_server.wait_for_termination()


# pylint: disable=too-many-locals
def run_fleet_api() -> None:
"""Run Flower server (Fleet API)."""
log(INFO, "Starting Flower server (Fleet API)")
# Running `flower-fleet-api` is deprecated
warn_deprecated_feature("flower-fleet-api")
log(WARN, "Use `flower-superlink` instead")
event(EventType.RUN_FLEET_API_ENTER)
args = _parse_args_run_fleet_api().parse_args()

# Obtain certificates
certificates = _try_obtain_certificates(args)

# Initialize StateFactory
state_factory = StateFactory(args.database)

grpc_servers = []
bckg_threads = []

address_arg = args.fleet_api_address
parsed_address = parse_address(address_arg)
if not parsed_address:
sys.exit(f"Fleet IP address ({address_arg}) cannot be parsed.")
host, port, is_v6 = parsed_address
address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}"

num_workers = args.fleet_api_num_workers
if num_workers != 1:
log(
WARN,
"The Fleet API currently supports only 1 worker. "
"You have specified %d workers. "
"Support for multiple workers will be added in future releases. "
"Proceeding with a single worker.",
args.fleet_api_num_workers,
)
num_workers = 1

# Start Fleet API
if args.fleet_api_type == TRANSPORT_TYPE_REST:
if (
importlib.util.find_spec("requests")
and importlib.util.find_spec("starlette")
and importlib.util.find_spec("uvicorn")
) is None:
sys.exit(MISSING_EXTRA_REST)

_, ssl_certfile, ssl_keyfile = (
certificates if certificates is not None else (None, None, None)
)
fleet_thread = threading.Thread(
target=_run_fleet_api_rest,
args=(
host,
port,
ssl_keyfile,
ssl_certfile,
state_factory,
num_workers,
),
)
fleet_thread.start()
bckg_threads.append(fleet_thread)
elif args.fleet_api_type == TRANSPORT_TYPE_GRPC_RERE:
address_arg = args.grpc_rere_fleet_api_address
parsed_address = parse_address(address_arg)
if not parsed_address:
sys.exit(f"Fleet IP address ({address_arg}) cannot be parsed.")
host, port, is_v6 = parsed_address
address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}"
fleet_server = _run_fleet_api_grpc_rere(
address=address,
state_factory=state_factory,
certificates=certificates,
)
grpc_servers.append(fleet_server)
else:
raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}")

# Graceful shutdown
register_exit_handlers(
event_type=EventType.RUN_FLEET_API_LEAVE,
grpc_servers=grpc_servers,
bckg_threads=bckg_threads,
)

# Block
if len(grpc_servers) > 0:
grpc_servers[0].wait_for_termination()
elif len(bckg_threads) > 0:
bckg_threads[0].join()


# pylint: disable=too-many-branches, too-many-locals, too-many-statements
def run_superlink() -> None:
"""Run Flower SuperLink (Driver API and Fleet API)."""
Expand Down Expand Up @@ -661,40 +528,6 @@ def _run_fleet_api_rest(
)


def _parse_args_run_driver_api() -> argparse.ArgumentParser:
"""Parse command line arguments for Driver API."""
parser = argparse.ArgumentParser(
description="Start a Flower Driver API server. "
"This server will be responsible for "
"receiving TaskIns from the Driver script and "
"sending them to the Fleet API. Once the client nodes "
"are done, they will send the TaskRes back to this Driver API server (through"
" the Fleet API) which will then send them back to the Driver script.",
)

_add_args_common(parser=parser)
_add_args_driver_api(parser=parser)

return parser


def _parse_args_run_fleet_api() -> argparse.ArgumentParser:
"""Parse command line arguments for Fleet API."""
parser = argparse.ArgumentParser(
description="Start a Flower Fleet API server."
"This server will be responsible for "
"sending TaskIns (received from the Driver API) to the client nodes "
"and of receiving TaskRes sent back from those same client nodes once "
"they are done. Then, this Fleet API server can send those "
"TaskRes back to the Driver API.",
)

_add_args_common(parser=parser)
_add_args_fleet_api(parser=parser)

return parser


def _parse_args_run_superlink() -> argparse.ArgumentParser:
"""Parse command line arguments for both Driver API and Fleet API."""
parser = argparse.ArgumentParser(
Expand Down

0 comments on commit 742e498

Please sign in to comment.