Skip to content

Commit

Permalink
break(framework) Allow ExecServicer to access LinkState and Ffs (
Browse files Browse the repository at this point in the history
  • Loading branch information
panh99 authored Oct 24, 2024
1 parent cc61019 commit 60df4e4
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 175 deletions.
6 changes: 4 additions & 2 deletions src/py/flwr/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
)
from flwr.proto.grpcadapter_pb2_grpc import add_GrpcAdapterServicer_to_server
from flwr.superexec.app import load_executor
from flwr.superexec.exec_grpc import run_superexec_api_grpc
from flwr.superexec.exec_grpc import run_exec_api_grpc

from .client_manager import ClientManager
from .history import History
Expand Down Expand Up @@ -329,8 +329,10 @@ def run_superlink() -> None:
raise ValueError(f"Unknown fleet_api_type: {args.fleet_api_type}")

# Start Exec API
exec_server: grpc.Server = run_superexec_api_grpc(
exec_server: grpc.Server = run_exec_api_grpc(
address=exec_address,
state_factory=state_factory,
ffs_factory=ffs_factory,
executor=load_executor(args),
certificates=certificates,
config=parse_config_args(
Expand Down
141 changes: 3 additions & 138 deletions src/py/flwr/superexec/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,159 +16,24 @@

import argparse
import sys
from logging import INFO, WARN
from pathlib import Path
from typing import Optional
from logging import INFO

import grpc

from flwr.common import EventType, event, log
from flwr.common.address import parse_address
from flwr.common.config import parse_config_args
from flwr.common.constant import EXEC_API_DEFAULT_ADDRESS
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import warn_deprecated_feature
from flwr.common import log
from flwr.common.object_ref import load_app, validate

from .exec_grpc import run_superexec_api_grpc
from .executor import Executor


def run_superexec() -> None:
"""Run Flower SuperExec."""
log(INFO, "Starting Flower SuperExec")

warn_deprecated_feature(
sys.exit(
"Manually launching the SuperExec is deprecated. Since `flwr 1.13.0` "
"the executor service runs in the SuperLink. Launching it manually is not "
"recommended."
)

event(EventType.RUN_SUPEREXEC_ENTER)

args = _parse_args_run_superexec().parse_args()

# Parse IP address
parsed_address = parse_address(args.address)
if not parsed_address:
sys.exit(f"SuperExec IP address ({args.address}) cannot be parsed.")
host, port, is_v6 = parsed_address
address = f"[{host}]:{port}" if is_v6 else f"{host}:{port}"

# Obtain certificates
certificates = _try_obtain_certificates(args)

# Start SuperExec API
superexec_server: grpc.Server = run_superexec_api_grpc(
address=address,
executor=load_executor(args),
certificates=certificates,
config=parse_config_args(
[args.executor_config] if args.executor_config else args.executor_config
),
)

grpc_servers = [superexec_server]

# Graceful shutdown
register_exit_handlers(
event_type=EventType.RUN_SUPEREXEC_LEAVE,
grpc_servers=grpc_servers,
bckg_threads=None,
)

superexec_server.wait_for_termination()


def _parse_args_run_superexec() -> argparse.ArgumentParser:
"""Parse command line arguments for SuperExec."""
parser = argparse.ArgumentParser(
description="Start a Flower SuperExec",
)
parser.add_argument(
"--address",
help="SuperExec (gRPC) server address (IPv4, IPv6, or a domain name)",
default=EXEC_API_DEFAULT_ADDRESS,
)
parser.add_argument(
"--executor",
help="For example: `deployment:exec` or `project.package.module:wrapper.exec`.",
default="flwr.superexec.deployment:executor",
)
parser.add_argument(
"--executor-dir",
help="The directory for the executor.",
default=".",
)
parser.add_argument(
"--executor-config",
help="Key-value pairs for the executor config, separated by spaces. "
'For example:\n\n`--executor-config \'superlink="superlink:9091" '
'root-certificates="certificates/superlink-ca.crt"\'`',
)
parser.add_argument(
"--insecure",
action="store_true",
help="Run the SuperExec without HTTPS, regardless of whether certificate "
"paths are provided. By default, the server runs with HTTPS enabled. "
"Use this flag only if you understand the risks.",
)
parser.add_argument(
"--ssl-certfile",
help="SuperExec server SSL certificate file (as a path str) "
"to create a secure connection.",
type=str,
default=None,
)
parser.add_argument(
"--ssl-keyfile",
help="SuperExec server SSL private key file (as a path str) "
"to create a secure connection.",
type=str,
)
parser.add_argument(
"--ssl-ca-certfile",
help="SuperExec server SSL CA certificate file (as a path str) "
"to create a secure connection.",
type=str,
)
return parser


def _try_obtain_certificates(
args: argparse.Namespace,
) -> Optional[tuple[bytes, bytes, bytes]]:
# Obtain certificates
if args.insecure:
log(WARN, "Option `--insecure` was set. Starting insecure HTTP server.")
return None
# Check if certificates are provided
if args.ssl_certfile and args.ssl_keyfile and args.ssl_ca_certfile:
if not Path(args.ssl_ca_certfile).is_file():
sys.exit("Path argument `--ssl-ca-certfile` does not point to a file.")
if not Path(args.ssl_certfile).is_file():
sys.exit("Path argument `--ssl-certfile` does not point to a file.")
if not Path(args.ssl_keyfile).is_file():
sys.exit("Path argument `--ssl-keyfile` does not point to a file.")
certificates = (
Path(args.ssl_ca_certfile).read_bytes(), # CA certificate
Path(args.ssl_certfile).read_bytes(), # server certificate
Path(args.ssl_keyfile).read_bytes(), # server private key
)
return certificates
if args.ssl_certfile or args.ssl_keyfile or args.ssl_ca_certfile:
sys.exit(
"You need to provide valid file paths to `--ssl-certfile`, "
"`--ssl-keyfile`, and `—-ssl-ca-certfile` to create a secure "
"connection in SuperExec server (gRPC-rere)."
)
sys.exit(
"Certificates are required unless running in insecure mode. "
"Please provide certificate paths to `--ssl-certfile`, "
"`--ssl-keyfile`, and `—-ssl-ca-certfile` or run the server "
"in insecure mode using '--insecure' if you understand the risks."
)


def load_executor(
args: argparse.Namespace,
Expand Down
59 changes: 34 additions & 25 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@

from flwr.cli.install import install_from_fab
from flwr.common.constant import DRIVER_API_DEFAULT_ADDRESS
from flwr.common.grpc import create_channel
from flwr.common.logger import log
from flwr.common.serde import fab_to_proto, user_config_to_proto
from flwr.common.typing import Fab, UserConfig
from flwr.proto.driver_pb2_grpc import DriverStub
from flwr.proto.run_pb2 import CreateRunRequest # pylint: disable=E0611
from flwr.server.superlink.ffs import Ffs
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkState, LinkStateFactory

from .executor import Executor, RunTracker

Expand Down Expand Up @@ -62,7 +61,30 @@ def __init__(
self.root_certificates = root_certificates
self.root_certificates_bytes = Path(root_certificates).read_bytes()
self.flwr_dir = flwr_dir
self.stub: Optional[DriverStub] = None
self.linkstate_factory: Optional[LinkStateFactory] = None
self.ffs_factory: Optional[FfsFactory] = None

@override
def initialize(
self, linkstate_factory: LinkStateFactory, ffs_factory: FfsFactory
) -> None:
"""Initialize the executor with the necessary factories."""
self.linkstate_factory = linkstate_factory
self.ffs_factory = ffs_factory

@property
def linkstate(self) -> LinkState:
"""Return the LinkState."""
if self.linkstate_factory is None:
raise RuntimeError("Executor is not initialized.")
return self.linkstate_factory.state()

@property
def ffs(self) -> Ffs:
"""Return the Flower File Storage (FFS)."""
if self.ffs_factory is None:
raise RuntimeError("Executor is not initialized.")
return self.ffs_factory.ffs()

@override
def set_config(
Expand Down Expand Up @@ -101,32 +123,19 @@ def set_config(
raise ValueError("The `flwr-dir` value should be of type `str`.")
self.flwr_dir = str(flwr_dir)

def _connect(self) -> None:
if self.stub is not None:
return
channel = create_channel(
server_address=self.superlink,
insecure=(self.root_certificates_bytes is None),
root_certificates=self.root_certificates_bytes,
)
self.stub = DriverStub(channel)

def _create_run(
self,
fab: Fab,
override_config: UserConfig,
) -> int:
if self.stub is None:
self._connect()

assert self.stub is not None
fab_hash = self.ffs.put(fab.content, {})
if fab_hash != fab.hash_str:
raise RuntimeError(
f"FAB ({fab.hash_str}) hash from request doesn't match contents"
)

req = CreateRunRequest(
fab=fab_to_proto(fab),
override_config=user_config_to_proto(override_config),
)
res = self.stub.CreateRun(request=req)
return int(res.run_id)
run_id = self.linkstate.create_run(None, None, fab_hash, override_config)
return run_id

@override
def start_run(
Expand Down
23 changes: 15 additions & 8 deletions src/py/flwr/superexec/exec_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,40 @@
from flwr.common.logger import log
from flwr.common.typing import UserConfig
from flwr.proto.exec_pb2_grpc import add_ExecServicer_to_server
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server
from flwr.server.superlink.linkstate import LinkStateFactory

from .exec_servicer import ExecServicer
from .executor import Executor


def run_superexec_api_grpc(
# pylint: disable-next=too-many-arguments, too-many-positional-arguments
def run_exec_api_grpc(
address: str,
executor: Executor,
state_factory: LinkStateFactory,
ffs_factory: FfsFactory,
certificates: Optional[tuple[bytes, bytes, bytes]],
config: UserConfig,
) -> grpc.Server:
"""Run SuperExec API (gRPC, request-response)."""
"""Run Exec API (gRPC, request-response)."""
executor.set_config(config)

exec_servicer: grpc.Server = ExecServicer(
linkstate_factory=state_factory,
ffs_factory=ffs_factory,
executor=executor,
)
superexec_add_servicer_to_server_fn = add_ExecServicer_to_server
superexec_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(exec_servicer, superexec_add_servicer_to_server_fn),
exec_add_servicer_to_server_fn = add_ExecServicer_to_server
exec_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(exec_servicer, exec_add_servicer_to_server_fn),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
certificates=certificates,
)

log(INFO, "Starting Flower SuperExec gRPC server on %s", address)
superexec_grpc_server.start()
log(INFO, "Flower Deployment Engine: Starting Exec API on %s", address)
exec_grpc_server.start()

return superexec_grpc_server
return exec_grpc_server
12 changes: 11 additions & 1 deletion src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
StreamLogsRequest,
StreamLogsResponse,
)
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkStateFactory

from .executor import Executor, RunTracker

Expand All @@ -43,8 +45,16 @@
class ExecServicer(exec_pb2_grpc.ExecServicer):
"""SuperExec API servicer."""

def __init__(self, executor: Executor) -> None:
def __init__(
self,
linkstate_factory: LinkStateFactory,
ffs_factory: FfsFactory,
executor: Executor,
) -> None:
self.linkstate_factory = linkstate_factory
self.ffs_factory = ffs_factory
self.executor = executor
self.executor.initialize(linkstate_factory, ffs_factory)
self.runs: dict[int, RunTracker] = {}

def StartRun(
Expand Down
2 changes: 1 addition & 1 deletion src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_start_run() -> None:
request.fab.content = b"test"

# Create a instance of FlowerServiceServicer
servicer = ExecServicer(executor=executor)
servicer = ExecServicer(Mock(), Mock(), executor=executor)

# Execute
response = servicer.StartRun(request, context_mock)
Expand Down
19 changes: 19 additions & 0 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from typing import Optional

from flwr.common.typing import UserConfig
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkStateFactory


@dataclass
Expand All @@ -34,6 +36,23 @@ class RunTracker:
class Executor(ABC):
"""Execute and monitor a Flower run."""

@abstractmethod
def initialize(
self, linkstate_factory: LinkStateFactory, ffs_factory: FfsFactory
) -> None:
"""Initialize the executor with the necessary factories.
This method sets up the executor by providing it with the factories required
to access the LinkState and the Flower File Storage (FFS) in the SuperLink.
Parameters
----------
linkstate_factory : LinkStateFactory
The factory to create access to the LinkState.
ffs_factory : FfsFactory
The factory to create access to the Flower File Storage (FFS).
"""

@abstractmethod
def set_config(
self,
Expand Down
Loading

0 comments on commit 60df4e4

Please sign in to comment.