Skip to content

Commit

Permalink
Merge branch 'main' into update-xgb-tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
flwrmachine authored Oct 29, 2024
2 parents 8d1e841 + 0c06469 commit a56e428
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 44 deletions.
4 changes: 2 additions & 2 deletions e2e/test_reconnection.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ esac

dir_arg="./.."

timeout 2m flower-superlink --insecure $db_arg $rest_arg &
timeout 2m flower-superlink --insecure $db_arg $rest_arg --isolation="process"&
sl_pid=$!
echo "Starting SuperLink"
sleep 3
Expand All @@ -45,7 +45,7 @@ echo "Killing Superlink"
sleep 3

# Restart superlink, the clients should now be able to reconnect to it
timeout 2m flower-superlink --insecure $db_arg $rest_arg &
timeout 2m flower-superlink --insecure $db_arg $rest_arg --isolation="process" &
sl_pid=$!
echo "Restarting Superlink"
sleep 20
Expand Down
2 changes: 1 addition & 1 deletion e2e/test_superlink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case "$2" in
;;
esac

timeout 2m flower-superlink $server_arg $db_arg $rest_arg_superlink $server_auth &
timeout 2m flower-superlink $server_arg $db_arg $rest_arg_superlink $server_auth --isolation="process" &
sl_pid=$!
sleep 3

Expand Down
125 changes: 119 additions & 6 deletions src/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,38 @@

import argparse
import sys
from logging import DEBUG, INFO, WARN
from logging import DEBUG, ERROR, INFO, WARN
from os.path import isfile
from pathlib import Path
from time import sleep
from typing import Optional

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.common.config import (
get_flwr_dir,
get_fused_config_from_dir,
get_project_config,
get_project_dir,
)
from flwr.common.constant import Status, SubStatus
from flwr.common.logger import log
from flwr.common.serde import (
context_from_proto,
context_to_proto,
fab_from_proto,
run_from_proto,
run_status_to_proto,
)
from flwr.common.typing import RunStatus
from flwr.proto.driver_pb2 import ( # pylint: disable=E0611
PullServerAppInputsRequest,
PullServerAppInputsResponse,
PushServerAppOutputsRequest,
)
from flwr.proto.run_pb2 import UpdateRunStatusRequest # pylint: disable=E0611
from flwr.server.driver.grpc_driver import GrpcDriver
from flwr.server.run_serverapp import run as run_


def flwr_serverapp() -> None:
Expand Down Expand Up @@ -121,20 +146,108 @@ def _try_obtain_certificates(
return root_certificates


def run_serverapp( # pylint: disable=R0914
def run_serverapp( # pylint: disable=R0914, disable=W0212
superlink: str,
run_id: Optional[int] = None,
flwr_dir_: Optional[str] = None,
certificates: Optional[bytes] = None,
) -> None:
"""Run Flower ServerApp process."""
_ = GrpcDriver(
driver = GrpcDriver(
driver_service_address=superlink,
root_certificates=certificates,
)

log(INFO, "%s, %d", flwr_dir_, run_id)
# Resolve directory where FABs are installed
flwr_dir = get_flwr_dir(flwr_dir_)

# Then, GetServerInputs
only_once = run_id is not None

# Then, run ServerApp
while True:

try:
# Pull ServerAppInputs from LinkState
req = (
PullServerAppInputsRequest(run_id=run_id)
if run_id
else PullServerAppInputsRequest()
)
res: PullServerAppInputsResponse = driver._stub.PullServerAppInputs(req)
if not res.HasField("run"):
sleep(3)
run_status = None
continue

context = context_from_proto(res.context)
run = run_from_proto(res.run)
fab = fab_from_proto(res.fab)

driver.init_run(run.run_id)

log(DEBUG, "ServerApp process starts FAB installation.")
install_from_fab(fab.content, flwr_dir=flwr_dir, skip_prompt=True)

fab_id, fab_version = get_fab_metadata(fab.content)

app_path = str(get_project_dir(fab_id, fab_version, fab.hash_str, flwr_dir))
config = get_project_config(app_path)

# Obtain server app reference and the run config
server_app_attr = config["tool"]["flwr"]["app"]["components"]["serverapp"]
server_app_run_config = get_fused_config_from_dir(
Path(app_path), run.override_config
)

# Update run_config in context
context.run_config = server_app_run_config

log(
DEBUG,
"Flower will load ServerApp `%s` in %s",
server_app_attr,
app_path,
)

# Change status to Running
run_status_proto = run_status_to_proto(RunStatus(Status.RUNNING, "", ""))
driver._stub.UpdateRunStatus(
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

# Load and run the ServerApp with the Driver
updated_context = run_(
driver=driver,
server_app_dir=app_path,
server_app_attr=server_app_attr,
context=context,
)

# Send resulting context
context_proto = context_to_proto(updated_context)
out_req = PushServerAppOutputsRequest(
run_id=run.run_id, context=context_proto
)
_ = driver._stub.PushServerAppOutputs(out_req)

run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "")

except Exception as ex: # pylint: disable=broad-exception-caught
exc_entity = "ServerApp"
log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex))

finally:
if run_status:
run_status_proto = run_status_to_proto(run_status)
driver._stub.UpdateRunStatus(
UpdateRunStatusRequest(
run_id=run.run_id, run_status=run_status_proto
)
)

# Stop the loop if `flwr-serverapp` is expected to process a single run
if only_once:
break

# Reset the run_id
run_id = None
36 changes: 1 addition & 35 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
"""Deployment engine executor."""

import hashlib
import subprocess
from logging import ERROR, INFO
from pathlib import Path
from typing import Optional

from typing_extensions import override

from flwr.cli.install import install_from_fab
from flwr.common.constant import DRIVER_API_DEFAULT_ADDRESS
from flwr.common.logger import log
from flwr.common.typing import Fab, UserConfig
Expand Down Expand Up @@ -146,46 +144,14 @@ def start_run(
) -> Optional[RunTracker]:
"""Start run using the Flower Deployment Engine."""
try:
# Install FAB to flwr dir
install_from_fab(fab_file, None, True)

# Call SuperLink to create run
run_id: int = self._create_run(
Fab(hashlib.sha256(fab_file).hexdigest(), fab_file), override_config
)
log(INFO, "Created run %s", str(run_id))

command = [
"flower-server-app",
"--run-id",
str(run_id),
"--superlink",
str(self.superlink),
]

if self.flwr_dir:
command.append("--flwr-dir")
command.append(self.flwr_dir)

if self.root_certificates is None:
command.append("--insecure")
else:
command.append("--root-certificates")
command.append(self.root_certificates)

# Execute the command
proc = subprocess.Popen( # pylint: disable=consider-using-with
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
log(INFO, "Started run %s", str(run_id))

return RunTracker(
run_id=run_id,
proc=proc,
)
return None
# pylint: disable-next=broad-except
except Exception as e:
log(ERROR, "Could not start run: %s", str(e))
Expand Down

0 comments on commit a56e428

Please sign in to comment.