Skip to content

Commit

Permalink
feat(framework) Parse initialization arguments to ray (#3543)
Browse files Browse the repository at this point in the history
Co-authored-by: svdvoort <[email protected]>
Co-authored-by: jafermarq <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent 5480b6a commit 87a305c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 53 deletions.
2 changes: 1 addition & 1 deletion dev/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python -m flwr_tool.init_py_check src/py/flwr src/py/flwr_tool
echo "- init_py_check: done"

echo "- docformatter: start"
python -m docformatter -c -r src/py/flwr e2e -e src/py/flwr/proto
python -m docformatter -c -r src/py/flwr e2e -e src/py/flwr/proto
echo "- docformatter: done"

echo "- ruff: start"
Expand Down
69 changes: 44 additions & 25 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Ray backend for the Fleet API using the Simulation Engine."""

import pathlib
from logging import DEBUG, ERROR, WARNING
from logging import DEBUG, ERROR
from typing import Callable, Dict, List, Tuple, Union

import ray
Expand All @@ -24,16 +24,15 @@
from flwr.common.context import Context
from flwr.common.logger import log
from flwr.common.message import Message
from flwr.simulation.ray_transport.ray_actor import (
BasicActorPool,
ClientAppActor,
init_ray,
)
from flwr.common.typing import ConfigsRecordValues
from flwr.simulation.ray_transport.ray_actor import BasicActorPool, ClientAppActor
from flwr.simulation.ray_transport.utils import enable_tf_gpu_growth

from .backend import Backend, BackendConfig

ClientResourcesDict = Dict[str, Union[int, float]]
ActorArgsDict = Dict[str, Union[int, float, Callable[[], None]]]
RunTimeEnvDict = Dict[str, Union[str, List[str]]]


class RayBackend(Backend):
Expand All @@ -51,40 +50,29 @@ def __init__(
if not pathlib.Path(work_dir).exists():
raise ValueError(f"Specified work_dir {work_dir} does not exist.")

# Init ray and append working dir if needed
runtime_env = (
self._configure_runtime_env(work_dir=work_dir) if work_dir else None
)

if backend_config.get("mute_logging", False):
init_ray(
logging_level=WARNING, log_to_driver=False, runtime_env=runtime_env
)
elif backend_config.get("silent", False):
init_ray(logging_level=WARNING, log_to_driver=True, runtime_env=runtime_env)
else:
init_ray(runtime_env=runtime_env)
# Initialise ray
self.init_args_key = "init_args"
self.init_ray(backend_config, work_dir)

# Validate client resources
self.client_resources_key = "client_resources"
client_resources = self._validate_client_resources(config=backend_config)

# Create actor pool
use_tf = backend_config.get("tensorflow", False)
actor_kwargs = {"on_actor_init_fn": enable_tf_gpu_growth} if use_tf else {}
actor_kwargs = self._validate_actor_arguments(config=backend_config)

client_resources = self._validate_client_resources(config=backend_config)
self.pool = BasicActorPool(
actor_type=ClientAppActor,
client_resources=client_resources,
actor_kwargs=actor_kwargs,
)

def _configure_runtime_env(self, work_dir: str) -> Dict[str, Union[str, List[str]]]:
def _configure_runtime_env(self, work_dir: str) -> RunTimeEnvDict:
"""Return list of files/subdirectories to exclude relative to work_dir.
Without this, Ray will push everything to the Ray Cluster.
"""
runtime_env: Dict[str, Union[str, List[str]]] = {"working_dir": work_dir}
runtime_env: RunTimeEnvDict = {"working_dir": work_dir}

excludes = []
path = pathlib.Path(work_dir)
Expand Down Expand Up @@ -125,6 +113,37 @@ def _validate_client_resources(self, config: BackendConfig) -> ClientResourcesDi

return client_resources

def _validate_actor_arguments(self, config: BackendConfig) -> ActorArgsDict:
actor_args_config = config.get("actor", False)
actor_args: ActorArgsDict = {}
if actor_args_config:
use_tf = actor_args.get("tensorflow", False)
if use_tf:
actor_args["on_actor_init_fn"] = enable_tf_gpu_growth
return actor_args

def init_ray(self, backend_config: BackendConfig, work_dir: str) -> None:
"""Intialises Ray if not already initialised."""
if not ray.is_initialized():
# Init ray and append working dir if needed
runtime_env = (
self._configure_runtime_env(work_dir=work_dir) if work_dir else None
)

ray_init_args: Dict[
str,
Union[ConfigsRecordValues, RunTimeEnvDict],
] = {}

if backend_config.get(self.init_args_key):
for k, v in backend_config[self.init_args_key].items():
ray_init_args[k] = v

if runtime_env is not None:
ray_init_args["runtime_env"] = runtime_env

ray.init(**ray_init_args)

@property
def num_workers(self) -> int:
"""Return number of actors in pool."""
Expand Down Expand Up @@ -152,7 +171,7 @@ async def process_message(
partition_id = message.metadata.partition_id

try:
# Submite a task to the pool
# Submit a task to the pool
future = await self.pool.submit(
lambda a, a_fn, mssg, cid, state: a.run.remote(a_fn, mssg, cid, state),
(app, message, str(partition_id), context),
Expand Down
33 changes: 33 additions & 0 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from flwr.common.object_ref import load_app
from flwr.common.recordset_compat import getpropertiesins_to_recordset
from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig
from flwr.server.superlink.fleet.vce.backend.raybackend import RayBackend


Expand Down Expand Up @@ -215,3 +216,35 @@ def test_backend_creation_submit_and_termination_existing_client_app_unsetworkdi
workdir="/?&%$^#%@$!",
)
self.addAsyncCleanup(self.on_cleanup)

def test_backend_creation_with_init_arguments(self) -> None:
"""Testing whether init args are properly parsed to Ray."""
backend_config_4: BackendConfig = {
"init_args": {"num_cpus": 4},
"client_resources": {"num_cpus": 1, "num_gpus": 0},
}

backend_config_2: BackendConfig = {
"init_args": {"num_cpus": 2},
"client_resources": {"num_cpus": 1, "num_gpus": 0},
}

RayBackend(
backend_config=backend_config_4,
work_dir="",
)
nodes = ray.nodes()

assert nodes[0]["Resources"]["CPU"] == backend_config_4["init_args"]["num_cpus"]

ray.shutdown()

RayBackend(
backend_config=backend_config_2,
work_dir="",
)
nodes = ray.nodes()

assert nodes[0]["Resources"]["CPU"] == backend_config_2["init_args"]["num_cpus"]

self.addAsyncCleanup(self.on_cleanup)
6 changes: 0 additions & 6 deletions src/py/flwr/simulation/ray_transport/ray_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,6 @@ def get_client_result(
return self._fetch_future_result(cid)


def init_ray(*args: Any, **kwargs: Any) -> None:
"""Intialises Ray if not already initialised."""
if not ray.is_initialized():
ray.init(*args, **kwargs)


class BasicActorPool:
"""A basic actor pool."""

Expand Down
54 changes: 33 additions & 21 deletions src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import traceback
from logging import DEBUG, ERROR, INFO, WARNING
from time import sleep
from typing import Dict, Optional
from typing import Optional

from flwr.client import ClientApp
from flwr.common import EventType, event, log
from flwr.common.logger import set_logger_propagation, update_console_handler
from flwr.common.typing import ConfigsRecordValues, Run
from flwr.common.typing import Run
from flwr.server.driver import Driver, InMemoryDriver
from flwr.server.run_serverapp import run
from flwr.server.server_app import ServerApp
from flwr.server.superlink.fleet import vce
from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig
from flwr.server.superlink.state import StateFactory
from flwr.simulation.ray_transport.utils import (
enable_tf_gpu_growth as enable_gpu_growth,
Expand Down Expand Up @@ -66,7 +67,7 @@ def run_simulation(
client_app: ClientApp,
num_supernodes: int,
backend_name: str = "ray",
backend_config: Optional[Dict[str, ConfigsRecordValues]] = None,
backend_config: Optional[BackendConfig] = None,
enable_tf_gpu_growth: bool = False,
verbose_logging: bool = False,
) -> None:
Expand All @@ -90,9 +91,12 @@ def run_simulation(
backend_name : str (default: ray)
A simulation backend that runs `ClientApp`s.
backend_config : Optional[Dict[str, ConfigsRecordValues]]
'A dictionary, e.g {"<keyA>": <value>, "<keyB>": <value>} to configure a
backend. Values supported in <value> are those included by
backend_config : Optional[BackendConfig]
'A dictionary to configure a backend. Separate dictionaries to configure
different elements of backend. Supported top-level keys are `init_args`
for values parsed to initialisation of backend, `client_resources`
to define the resources for clients, and `actor` to define the actor
parameters. Values supported in <value> are those included by
`flwr.common.typing.ConfigsRecordValues`.
enable_tf_gpu_growth : bool (default: False)
Expand All @@ -104,7 +108,7 @@ def run_simulation(
works in the TensorFlow documentation: https://www.tensorflow.org/api/stable.
verbose_logging : bool (default: False)
When diabled, only INFO, WARNING and ERROR log messages will be shown. If
When disabled, only INFO, WARNING and ERROR log messages will be shown. If
enabled, DEBUG-level logs will be displayed.
"""
_run_simulation(
Expand Down Expand Up @@ -133,7 +137,7 @@ def run_serverapp_th(
def server_th_with_start_checks( # type: ignore
tf_gpu_growth: bool, stop_event: asyncio.Event, **kwargs
) -> None:
"""Run SeverApp, after check if GPU memory grouwth has to be set.
"""Run SeverApp, after check if GPU memory growth has to be set.
Upon exception, trigger stop event for Simulation Engine.
"""
Expand Down Expand Up @@ -194,7 +198,7 @@ def _main_loop(
) -> None:
"""Launch SuperLink with Simulation Engine, then ServerApp on a separate thread.
Everything runs on the main thread or a separate one, depening on whether the main
Everything runs on the main thread or a separate one, depending on whether the main
thread already contains a running Asyncio event loop. This is the case if running
the Simulation Engine on a Jupyter/Colab notebook.
"""
Expand Down Expand Up @@ -259,7 +263,7 @@ def _run_simulation(
client_app: Optional[ClientApp] = None,
server_app: Optional[ServerApp] = None,
backend_name: str = "ray",
backend_config: Optional[Dict[str, ConfigsRecordValues]] = None,
backend_config: Optional[BackendConfig] = None,
client_app_attr: Optional[str] = None,
server_app_attr: Optional[str] = None,
app_dir: str = "",
Expand All @@ -286,9 +290,12 @@ def _run_simulation(
backend_name : str (default: ray)
A simulation backend that runs `ClientApp`s.
backend_config : Optional[Dict[str, ConfigsRecordValues]]
'A dictionary, e.g {"<keyA>":<value>, "<keyB>":<value>} to configure a
backend. Values supported in <value> are those included by
backend_config : Optional[BackendConfig]
'A dictionary to configure a backend. Separate dictionaries to configure
different elements of backend. Supported top-level keys are `init_args`
for values parsed to initialisation of backend, `client_resources`
to define the resources for clients, and `actor` to define the actor
parameters. Values supported in <value> are those included by
`flwr.common.typing.ConfigsRecordValues`.
client_app_attr : str
Expand All @@ -310,30 +317,34 @@ def _run_simulation(
A boolean to indicate whether to enable GPU growth on the main thread. This is
desirable if you make use of a TensorFlow model on your `ServerApp` while
having your `ClientApp` running on the same GPU. Without enabling this, you
might encounter an out-of-memory error becasue TensorFlow by default allocates
might encounter an out-of-memory error because TensorFlow by default allocates
all GPU memory. Read mor about how `tf.config.experimental.set_memory_growth()`
works in the TensorFlow documentation: https://www.tensorflow.org/api/stable.
verbose_logging : bool (default: False)
When diabled, only INFO, WARNING and ERROR log messages will be shown. If
When disabled, only INFO, WARNING and ERROR log messages will be shown. If
enabled, DEBUG-level logs will be displayed.
"""
if backend_config is None:
backend_config = {}

if "init_args" not in backend_config:
backend_config["init_args"] = {}

# Set logging level
logger = logging.getLogger("flwr")
if verbose_logging:
update_console_handler(level=DEBUG, timestamps=True, colored=True)
else:
backend_config["silent"] = True
backend_config["init_args"]["logging_level"] = WARNING
backend_config["init_args"]["log_to_driver"] = True

if enable_tf_gpu_growth:
# Check that Backend config has also enabled using GPU growth
use_tf = backend_config.get("tensorflow", False)
use_tf = backend_config.get("actor", {}).get("tensorflow", False)
if not use_tf:
log(WARNING, "Enabling GPU growth for your backend.")
backend_config["tensorflow"] = True
backend_config["actor"]["tensorflow"] = True

# Convert config to original JSON-stream format
backend_config_stream = json.dumps(backend_config)
Expand All @@ -352,7 +363,7 @@ def _run_simulation(
server_app_attr,
)
# Detect if there is an Asyncio event loop already running.
# If yes, run everything on a separate thread. In environmnets
# If yes, run everything on a separate thread. In environments
# like Jupyter/Colab notebooks, there is an event loop present.
run_in_thread = False
try:
Expand All @@ -364,7 +375,7 @@ def _run_simulation(
run_in_thread = True

except RuntimeError:
log(DEBUG, "No asyncio event loop runnig")
log(DEBUG, "No asyncio event loop running")

finally:
if run_in_thread:
Expand Down Expand Up @@ -409,7 +420,8 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
parser.add_argument(
"--backend-config",
type=str,
default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}, "tensorflow": 0}',
default='{"client_resources": {"num_cpus":2, "num_gpus":0.0},'
'"actor": {"tensorflow": 0}}',
help='A JSON formatted stream, e.g \'{"<keyA>":<value>, "<keyB>":<value>}\' to '
"configure a backend. Values supported in <value> are those included by "
"`flwr.common.typing.ConfigsRecordValues`. ",
Expand Down

0 comments on commit 87a305c

Please sign in to comment.