Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Run app with flwr run calling flower-simulation #3819

Merged
merged 37 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
679a783
init
jafermarq Jul 15, 2024
29ae904
Merge branch 'main' into support-fab-in-flower-simulation
jafermarq Jul 15, 2024
6b7e88d
fix args checks
jafermarq Jul 15, 2024
4d4b049
basic functionality; no override; no from .fab
jafermarq Jul 15, 2024
6d99d90
feat(framework:skip) Add config function for fusing dicts
charlesbvll Jul 16, 2024
8430e28
simplify
jafermarq Jul 16, 2024
97c0614
Merge branch 'add-config-function-overrides' into support-fab-in-flow…
jafermarq Jul 16, 2024
11086ba
using fused config -- but not in `ClientApp` yet
jafermarq Jul 16, 2024
e709b08
Merge branch 'main' into support-fab-in-flower-simulation
jafermarq Jul 16, 2024
46244de
updates
jafermarq Jul 16, 2024
da0d8f1
init
jafermarq Jul 16, 2024
e2d2347
fix
jafermarq Jul 16, 2024
122cb31
Apply suggestions from code review
jafermarq Jul 16, 2024
307c29e
updates
jafermarq Jul 16, 2024
952572c
Merge branch 'main' into register-context-using-fab-dir
jafermarq Jul 16, 2024
3ec626d
updates
jafermarq Jul 16, 2024
f65dcbf
Merge branch 'register-context-using-fab-dir' into support-fab-in-flo…
jafermarq Jul 16, 2024
909d410
clientapp gets runconfig from `pyproject.toml`
jafermarq Jul 16, 2024
07e4d3e
Merge branch 'main' into support-fab-in-flower-simulation
danieljanes Jul 16, 2024
7d1c429
Merge branch 'main' into support-fab-in-flower-simulation
danieljanes Jul 16, 2024
f4a10ed
updates from review
jafermarq Jul 16, 2024
1b09bb7
init
jafermarq Jul 16, 2024
9bf5fa3
Update src/py/flwr/cli/run/run.py
jafermarq Jul 16, 2024
1fbf1e0
w/ previous
jafermarq Jul 16, 2024
a74894a
Merge branch 'main' into make-native-flwr-run-use-flower-simulation
jafermarq Jul 16, 2024
571b9ee
Update src/py/flwr/simulation/run_simulation.py
danieljanes Jul 16, 2024
109f521
Update src/py/flwr/simulation/run_simulation.py
danieljanes Jul 16, 2024
211d838
Update src/py/flwr/simulation/run_simulation.py
danieljanes Jul 16, 2024
acc533c
Update src/py/flwr/simulation/run_simulation.py
danieljanes Jul 16, 2024
241d1e6
Merge branch 'main' into make-native-flwr-run-use-flower-simulation
danieljanes Jul 16, 2024
5cfe384
revert `run_simulation.py` changes
jafermarq Jul 16, 2024
cb6924b
basic run process
jafermarq Jul 16, 2024
587d1b7
check
jafermarq Jul 16, 2024
ff71787
pass config-override to `flower-simulation
jafermarq Jul 16, 2024
67d86a2
Add to
danieljanes Jul 16, 2024
ab7ca15
Merge branch 'make-native-flwr-run-use-flower-simulation' of github.c…
danieljanes Jul 16, 2024
4b3199a
correct handling configs override
jafermarq Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/py/flwr/cli/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# ==============================================================================
"""Flower command line interface `run` command."""

import subprocess
import sys
from logging import DEBUG
from pathlib import Path
Expand All @@ -29,7 +30,6 @@
from flwr.common.logger import log
from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611
from flwr.proto.exec_pb2_grpc import ExecStub
from flwr.simulation.run_simulation import _run_simulation


# pylint: disable-next=too-many-locals
Expand Down Expand Up @@ -107,7 +107,7 @@ def run(
if "address" in federation:
_run_with_superexec(federation, directory, config_overrides)
else:
_run_without_superexec(config, federation, federation_name)
_run_without_superexec(directory, federation, federation_name)


def _run_with_superexec(
Expand Down Expand Up @@ -169,10 +169,8 @@ def on_channel_state_change(channel_connectivity: str) -> None:


def _run_without_superexec(
config: Dict[str, Any], federation: Dict[str, Any], federation_name: str
directory: Optional[Path], federation: Dict[str, Any], federation_name: str
) -> None:
server_app_ref = config["tool"]["flwr"]["components"]["serverapp"]
client_app_ref = config["tool"]["flwr"]["components"]["clientapp"]

try:
num_supernodes = federation["options"]["num-supernodes"]
Expand All @@ -188,8 +186,18 @@ def _run_without_superexec(
)
raise typer.Exit(code=1) from err

_run_simulation(
server_app_attr=server_app_ref,
client_app_attr=client_app_ref,
num_supernodes=num_supernodes,
command = [
"flower-simulation",
"--app",
f"{directory}",
"--num-supernodes",
f"{num_supernodes}",
]
proc = subprocess.Popen( # pylint: disable=consider-using-with
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

# TODO: how to show logs?
13 changes: 10 additions & 3 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def _register_nodes(


def _register_node_states(
nodes_mapping: NodeToPartitionMapping, run: Run
nodes_mapping: NodeToPartitionMapping,
run: Run,
app_dir: Optional[str] = None,
) -> Dict[int, NodeState]:
"""Create NodeState objects and pre-register the context for the run."""
node_states: Dict[int, NodeState] = {}
Expand All @@ -76,7 +78,9 @@ def _register_node_states(
)

# Pre-register Context objects
node_states[node_id].register_context(run_id=run.run_id, run=run)
node_states[node_id].register_context(
run_id=run.run_id, run=run, app_dir=app_dir
)

return node_states

Expand Down Expand Up @@ -256,6 +260,7 @@ def start_vce(
backend_name: str,
backend_config_json_stream: str,
app_dir: str,
is_app: bool,
f_stop: threading.Event,
run: Run,
flwr_dir: Optional[str] = None,
Expand Down Expand Up @@ -309,7 +314,9 @@ def start_vce(
)

# Construct mapping of NodeStates
node_states = _register_node_states(nodes_mapping=nodes_mapping, run=run)
node_states = _register_node_states(
nodes_mapping=nodes_mapping, run=run, app_dir=app_dir if is_app else None
)

# Load backend config
log(DEBUG, "Supported backends: %s", list(supported_backends.keys()))
Expand Down
1 change: 1 addition & 0 deletions src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def start_and_shutdown(
backend_config_json_stream=backend_config,
state_factory=state_factory,
app_dir=app_dir,
is_app=False,
f_stop=f_stop,
run=run,
existing_nodes_mapping=nodes_mapping,
Expand Down
163 changes: 149 additions & 14 deletions src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
import asyncio
import json
import logging
import sys
import threading
import traceback
from argparse import Namespace
from logging import DEBUG, ERROR, INFO, WARNING
from pathlib import Path
from time import sleep
from typing import Dict, Optional
from typing import Dict, List, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.client import ClientApp
from flwr.common import EventType, event, log
from flwr.common.config import get_fused_config_from_dir, parse_config_args
from flwr.common.constant import RUN_ID_NUM_BYTES
from flwr.common.logger import set_logger_propagation, update_console_handler
from flwr.common.typing import Run
Expand All @@ -41,28 +46,130 @@
)


def _check_args_do_not_interfere(args: Namespace) -> bool:
"""Ensure decoupling of flags for different ways to start the simulation."""
mode_one_args = ["app", "run_config"]
mode_two_args = ["client_app", "server_app"]

def _resolve_message(conflict_keys: List[str]) -> str:
return ",".join([f"`--{key}`".replace("_", "-") for key in conflict_keys])

# When passing `--app`, `--app-dir` is ignored
if args.app and args.app_dir:
log(ERROR, "Either `--app` or `--app-dir` can be set, but not both.")
return False

if any(getattr(args, key) for key in mode_one_args):
if any(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing any of {%s} alongside with any of {%s}",
_resolve_message(mode_one_args),
_resolve_message(mode_two_args),
)
return False

if not args.app:
log(ERROR, "You need to pass --app")
return False

return True

# Ensure all args are set (required for the non-FAB mode of execution)
if not all(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing all of %s keys are required.",
_resolve_message(mode_two_args),
)
return False

return True


# Entry point from CLI
# pylint: disable=too-many-locals
def run_simulation_from_cli() -> None:
"""Run Simulation Engine from the CLI."""
args = _parse_args_run_simulation().parse_args()

# We are supporting two modes for the CLI entrypoint:
# 1) Running a FAB or FAB-like dir containing a pyproject.toml
# 2) Running any ClientApp and SeverApp w/o pyproject.toml being present
# For 2) some CLI args are cumpolsory but these aren't for 1)
# We first do these checks
args_check_pass = _check_args_do_not_interfere(args)
if not args_check_pass:
sys.exit("Simulation Engine cannot start.")

run_id = (
generate_rand_int_from_bytes(RUN_ID_NUM_BYTES)
if args.run_id is None
else args.run_id
)
if args.app:
# mode 1
app_path = Path(args.app)
if app_path.is_dir():
# Load pyproject.toml
config, errors, warnings = load_and_validate(app_path / "pyproject.toml")
if errors:
raise ValueError(errors)

if warnings:
log(WARNING, warnings)

if config is None:
raise ValueError(
"Config extracted from FAB's pyproject.toml is not valid"
)

# Get ClientApp and SeverApp components
flower_components = config["tool"]["flwr"]["app"]["components"]
client_app_attr = flower_components["clientapp"]
server_app_attr = flower_components["serverapp"]

else:
log(ERROR, "--app is not a directory")
sys.exit("Simulation Engine cannot start.")

override_config = parse_config_args(args.run_config)
fused_config = get_fused_config_from_dir(app_path, override_config)
app_dir = args.app
is_app = True

else:
# mode 2
client_app_attr = args.client_app
server_app_attr = args.server_app
override_config = {}
fused_config = None
app_dir = args.app_dir
is_app = False

# Create run
run = Run(
run_id=run_id,
fab_id="",
fab_version="",
override_config=override_config,
)

# Load JSON config
backend_config_dict = json.loads(args.backend_config)

_run_simulation(
server_app_attr=args.server_app,
client_app_attr=args.client_app,
server_app_attr=server_app_attr,
client_app_attr=client_app_attr,
num_supernodes=args.num_supernodes,
backend_name=args.backend,
backend_config=backend_config_dict,
app_dir=args.app_dir,
run=(
Run(run_id=args.run_id, fab_id="", fab_version="", override_config={})
if args.run_id
else None
),
app_dir=app_dir,
run=run,
enable_tf_gpu_growth=args.enable_tf_gpu_growth,
verbose_logging=args.verbose,
server_app_run_config=fused_config,
is_app=is_app,
)


Expand Down Expand Up @@ -205,13 +312,15 @@ def _main_loop(
backend_name: str,
backend_config_stream: str,
app_dir: str,
is_app: bool,
enable_tf_gpu_growth: bool,
run: Run,
flwr_dir: Optional[str] = None,
client_app: Optional[ClientApp] = None,
client_app_attr: Optional[str] = None,
server_app: Optional[ServerApp] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
) -> None:
"""Launch SuperLink with Simulation Engine, then ServerApp on a separate thread."""
# Initialize StateFactory
Expand All @@ -225,7 +334,9 @@ def _main_loop(
# Register run
log(DEBUG, "Pre-registering run with id %s", run.run_id)
state_factory.state().run_ids[run.run_id] = run # type: ignore
server_app_run_config: Dict[str, str] = {}

if server_app_run_config is None:
server_app_run_config = {}

# Initialize Driver
driver = InMemoryDriver(run_id=run.run_id, state_factory=state_factory)
Expand All @@ -251,6 +362,7 @@ def _main_loop(
backend_name=backend_name,
backend_config_json_stream=backend_config_stream,
app_dir=app_dir,
is_app=is_app,
state_factory=state_factory,
f_stop=f_stop,
run=run,
Expand Down Expand Up @@ -284,11 +396,13 @@ def _run_simulation(
backend_config: Optional[BackendConfig] = None,
client_app_attr: Optional[str] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
app_dir: str = "",
flwr_dir: Optional[str] = None,
run: Optional[Run] = None,
enable_tf_gpu_growth: bool = False,
verbose_logging: bool = False,
is_app: bool = False,
) -> None:
r"""Launch the Simulation Engine.

Expand Down Expand Up @@ -317,14 +431,18 @@ def _run_simulation(
parameters. Values supported in <value> are those included by
`flwr.common.typing.ConfigsRecordValues`.

client_app_attr : str
client_app_attr : Optional[str]
A path to a `ClientApp` module to be loaded: For example: `client:app` or
`project.package.module:wrapper.app`."

server_app_attr : str
server_app_attr : Optional[str]
A path to a `ServerApp` module to be loaded: For example: `server:app` or
`project.package.module:wrapper.app`."

server_app_run_config : Optional[Dict[str, str]]
Config dictionary that parameterizes the run config. It will be made accesible
to the ServerApp.

app_dir : str
Add specified directory to the PYTHONPATH and load `ClientApp` from there.
(Default: current working directory.)
Expand All @@ -346,6 +464,11 @@ def _run_simulation(
verbose_logging : bool (default: False)
When disabled, only INFO, WARNING and ERROR log messages will be shown. If
enabled, DEBUG-level logs will be displayed.

is_app : bool (default: False)
A flag that indicates whether the simulation is running an app or not. This is
needed in order to attempt loading an app's pyproject.toml when nodes register
a context object.
"""
if backend_config is None:
backend_config = {}
Expand Down Expand Up @@ -381,13 +504,15 @@ def _run_simulation(
backend_name,
backend_config_stream,
app_dir,
is_app,
enable_tf_gpu_growth,
run,
flwr_dir,
client_app,
client_app_attr,
server_app,
server_app_attr,
server_app_run_config,
)
# Detect if there is an Asyncio event loop already running.
# If yes, disable logger propagation. In environmnets
Expand Down Expand Up @@ -419,12 +544,10 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
)
parser.add_argument(
"--server-app",
required=True,
help="For example: `server:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
"--client-app",
required=True,
help="For example: `client:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
Expand All @@ -433,6 +556,18 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
required=True,
help="Number of simulated SuperNodes.",
)
parser.add_argument(
"--app",
type=str,
default=None,
help="Path to a directory containing a FAB-like structure with a "
"pyproject.toml.",
)
parser.add_argument(
"--run-config",
default=None,
help="Override configuration key-value pairs.",
)
parser.add_argument(
"--backend",
default="ray",
Expand Down
Loading