From 87a305cd68babca4c6c6f0c72798e68a9e52fbf9 Mon Sep 17 00:00:00 2001 From: Sebastian van der Voort Date: Fri, 21 Jun 2024 22:03:39 +0200 Subject: [PATCH] feat(framework) Parse initialization arguments to `ray` (#3543) Co-authored-by: svdvoort <23049683+Svdvoort@users.noreply.github.com> Co-authored-by: jafermarq --- dev/test.sh | 2 +- .../superlink/fleet/vce/backend/raybackend.py | 69 ++++++++++++------- .../fleet/vce/backend/raybackend_test.py | 33 +++++++++ .../simulation/ray_transport/ray_actor.py | 6 -- src/py/flwr/simulation/run_simulation.py | 54 +++++++++------ 5 files changed, 111 insertions(+), 53 deletions(-) diff --git a/dev/test.sh b/dev/test.sh index 5b827380bc50..8cbe88c9298b 100755 --- a/dev/test.sh +++ b/dev/test.sh @@ -23,7 +23,7 @@ python -m flwr_tool.init_py_check src/py/flwr src/py/flwr_tool echo "- init_py_check: done" echo "- docformatter: start" -python -m docformatter -c -r src/py/flwr e2e -e src/py/flwr/proto +python -m docformatter -c -r src/py/flwr e2e -e src/py/flwr/proto echo "- docformatter: done" echo "- ruff: start" diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py index 93aca583af9c..8a21393db590 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py @@ -15,7 +15,7 @@ """Ray backend for the Fleet API using the Simulation Engine.""" import pathlib -from logging import DEBUG, ERROR, WARNING +from logging import DEBUG, ERROR from typing import Callable, Dict, List, Tuple, Union import ray @@ -24,16 +24,15 @@ from flwr.common.context import Context from flwr.common.logger import log from flwr.common.message import Message -from flwr.simulation.ray_transport.ray_actor import ( - BasicActorPool, - ClientAppActor, - init_ray, -) +from flwr.common.typing import ConfigsRecordValues +from flwr.simulation.ray_transport.ray_actor import BasicActorPool, ClientAppActor from flwr.simulation.ray_transport.utils import enable_tf_gpu_growth from .backend import Backend, BackendConfig ClientResourcesDict = Dict[str, Union[int, float]] +ActorArgsDict = Dict[str, Union[int, float, Callable[[], None]]] +RunTimeEnvDict = Dict[str, Union[str, List[str]]] class RayBackend(Backend): @@ -51,40 +50,29 @@ def __init__( if not pathlib.Path(work_dir).exists(): raise ValueError(f"Specified work_dir {work_dir} does not exist.") - # Init ray and append working dir if needed - runtime_env = ( - self._configure_runtime_env(work_dir=work_dir) if work_dir else None - ) - - if backend_config.get("mute_logging", False): - init_ray( - logging_level=WARNING, log_to_driver=False, runtime_env=runtime_env - ) - elif backend_config.get("silent", False): - init_ray(logging_level=WARNING, log_to_driver=True, runtime_env=runtime_env) - else: - init_ray(runtime_env=runtime_env) + # Initialise ray + self.init_args_key = "init_args" + self.init_ray(backend_config, work_dir) # Validate client resources self.client_resources_key = "client_resources" + client_resources = self._validate_client_resources(config=backend_config) # Create actor pool - use_tf = backend_config.get("tensorflow", False) - actor_kwargs = {"on_actor_init_fn": enable_tf_gpu_growth} if use_tf else {} + actor_kwargs = self._validate_actor_arguments(config=backend_config) - client_resources = self._validate_client_resources(config=backend_config) self.pool = BasicActorPool( actor_type=ClientAppActor, client_resources=client_resources, actor_kwargs=actor_kwargs, ) - def _configure_runtime_env(self, work_dir: str) -> Dict[str, Union[str, List[str]]]: + def _configure_runtime_env(self, work_dir: str) -> RunTimeEnvDict: """Return list of files/subdirectories to exclude relative to work_dir. Without this, Ray will push everything to the Ray Cluster. """ - runtime_env: Dict[str, Union[str, List[str]]] = {"working_dir": work_dir} + runtime_env: RunTimeEnvDict = {"working_dir": work_dir} excludes = [] path = pathlib.Path(work_dir) @@ -125,6 +113,37 @@ def _validate_client_resources(self, config: BackendConfig) -> ClientResourcesDi return client_resources + def _validate_actor_arguments(self, config: BackendConfig) -> ActorArgsDict: + actor_args_config = config.get("actor", False) + actor_args: ActorArgsDict = {} + if actor_args_config: + use_tf = actor_args.get("tensorflow", False) + if use_tf: + actor_args["on_actor_init_fn"] = enable_tf_gpu_growth + return actor_args + + def init_ray(self, backend_config: BackendConfig, work_dir: str) -> None: + """Intialises Ray if not already initialised.""" + if not ray.is_initialized(): + # Init ray and append working dir if needed + runtime_env = ( + self._configure_runtime_env(work_dir=work_dir) if work_dir else None + ) + + ray_init_args: Dict[ + str, + Union[ConfigsRecordValues, RunTimeEnvDict], + ] = {} + + if backend_config.get(self.init_args_key): + for k, v in backend_config[self.init_args_key].items(): + ray_init_args[k] = v + + if runtime_env is not None: + ray_init_args["runtime_env"] = runtime_env + + ray.init(**ray_init_args) + @property def num_workers(self) -> int: """Return number of actors in pool.""" @@ -152,7 +171,7 @@ async def process_message( partition_id = message.metadata.partition_id try: - # Submite a task to the pool + # Submit a task to the pool future = await self.pool.submit( lambda a, a_fn, mssg, cid, state: a.run.remote(a_fn, mssg, cid, state), (app, message, str(partition_id), context), diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py index dcac0b81d666..57c952cc9310 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py @@ -38,6 +38,7 @@ ) from flwr.common.object_ref import load_app from flwr.common.recordset_compat import getpropertiesins_to_recordset +from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig from flwr.server.superlink.fleet.vce.backend.raybackend import RayBackend @@ -215,3 +216,35 @@ def test_backend_creation_submit_and_termination_existing_client_app_unsetworkdi workdir="/?&%$^#%@$!", ) self.addAsyncCleanup(self.on_cleanup) + + def test_backend_creation_with_init_arguments(self) -> None: + """Testing whether init args are properly parsed to Ray.""" + backend_config_4: BackendConfig = { + "init_args": {"num_cpus": 4}, + "client_resources": {"num_cpus": 1, "num_gpus": 0}, + } + + backend_config_2: BackendConfig = { + "init_args": {"num_cpus": 2}, + "client_resources": {"num_cpus": 1, "num_gpus": 0}, + } + + RayBackend( + backend_config=backend_config_4, + work_dir="", + ) + nodes = ray.nodes() + + assert nodes[0]["Resources"]["CPU"] == backend_config_4["init_args"]["num_cpus"] + + ray.shutdown() + + RayBackend( + backend_config=backend_config_2, + work_dir="", + ) + nodes = ray.nodes() + + assert nodes[0]["Resources"]["CPU"] == backend_config_2["init_args"]["num_cpus"] + + self.addAsyncCleanup(self.on_cleanup) diff --git a/src/py/flwr/simulation/ray_transport/ray_actor.py b/src/py/flwr/simulation/ray_transport/ray_actor.py index 9caf0fc3e6c0..7afffb865334 100644 --- a/src/py/flwr/simulation/ray_transport/ray_actor.py +++ b/src/py/flwr/simulation/ray_transport/ray_actor.py @@ -399,12 +399,6 @@ def get_client_result( return self._fetch_future_result(cid) -def init_ray(*args: Any, **kwargs: Any) -> None: - """Intialises Ray if not already initialised.""" - if not ray.is_initialized(): - ray.init(*args, **kwargs) - - class BasicActorPool: """A basic actor pool.""" diff --git a/src/py/flwr/simulation/run_simulation.py b/src/py/flwr/simulation/run_simulation.py index a3de1401d252..7c7a412a245b 100644 --- a/src/py/flwr/simulation/run_simulation.py +++ b/src/py/flwr/simulation/run_simulation.py @@ -22,16 +22,17 @@ import traceback from logging import DEBUG, ERROR, INFO, WARNING from time import sleep -from typing import Dict, Optional +from typing import Optional from flwr.client import ClientApp from flwr.common import EventType, event, log from flwr.common.logger import set_logger_propagation, update_console_handler -from flwr.common.typing import ConfigsRecordValues, Run +from flwr.common.typing import Run from flwr.server.driver import Driver, InMemoryDriver from flwr.server.run_serverapp import run from flwr.server.server_app import ServerApp from flwr.server.superlink.fleet import vce +from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig from flwr.server.superlink.state import StateFactory from flwr.simulation.ray_transport.utils import ( enable_tf_gpu_growth as enable_gpu_growth, @@ -66,7 +67,7 @@ def run_simulation( client_app: ClientApp, num_supernodes: int, backend_name: str = "ray", - backend_config: Optional[Dict[str, ConfigsRecordValues]] = None, + backend_config: Optional[BackendConfig] = None, enable_tf_gpu_growth: bool = False, verbose_logging: bool = False, ) -> None: @@ -90,9 +91,12 @@ def run_simulation( backend_name : str (default: ray) A simulation backend that runs `ClientApp`s. - backend_config : Optional[Dict[str, ConfigsRecordValues]] - 'A dictionary, e.g {"": , "": } to configure a - backend. Values supported in are those included by + backend_config : Optional[BackendConfig] + 'A dictionary to configure a backend. Separate dictionaries to configure + different elements of backend. Supported top-level keys are `init_args` + for values parsed to initialisation of backend, `client_resources` + to define the resources for clients, and `actor` to define the actor + parameters. Values supported in are those included by `flwr.common.typing.ConfigsRecordValues`. enable_tf_gpu_growth : bool (default: False) @@ -104,7 +108,7 @@ def run_simulation( works in the TensorFlow documentation: https://www.tensorflow.org/api/stable. verbose_logging : bool (default: False) - When diabled, only INFO, WARNING and ERROR log messages will be shown. If + When disabled, only INFO, WARNING and ERROR log messages will be shown. If enabled, DEBUG-level logs will be displayed. """ _run_simulation( @@ -133,7 +137,7 @@ def run_serverapp_th( def server_th_with_start_checks( # type: ignore tf_gpu_growth: bool, stop_event: asyncio.Event, **kwargs ) -> None: - """Run SeverApp, after check if GPU memory grouwth has to be set. + """Run SeverApp, after check if GPU memory growth has to be set. Upon exception, trigger stop event for Simulation Engine. """ @@ -194,7 +198,7 @@ def _main_loop( ) -> None: """Launch SuperLink with Simulation Engine, then ServerApp on a separate thread. - Everything runs on the main thread or a separate one, depening on whether the main + Everything runs on the main thread or a separate one, depending on whether the main thread already contains a running Asyncio event loop. This is the case if running the Simulation Engine on a Jupyter/Colab notebook. """ @@ -259,7 +263,7 @@ def _run_simulation( client_app: Optional[ClientApp] = None, server_app: Optional[ServerApp] = None, backend_name: str = "ray", - backend_config: Optional[Dict[str, ConfigsRecordValues]] = None, + backend_config: Optional[BackendConfig] = None, client_app_attr: Optional[str] = None, server_app_attr: Optional[str] = None, app_dir: str = "", @@ -286,9 +290,12 @@ def _run_simulation( backend_name : str (default: ray) A simulation backend that runs `ClientApp`s. - backend_config : Optional[Dict[str, ConfigsRecordValues]] - 'A dictionary, e.g {"":, "":} to configure a - backend. Values supported in are those included by + backend_config : Optional[BackendConfig] + 'A dictionary to configure a backend. Separate dictionaries to configure + different elements of backend. Supported top-level keys are `init_args` + for values parsed to initialisation of backend, `client_resources` + to define the resources for clients, and `actor` to define the actor + parameters. Values supported in are those included by `flwr.common.typing.ConfigsRecordValues`. client_app_attr : str @@ -310,30 +317,34 @@ def _run_simulation( A boolean to indicate whether to enable GPU growth on the main thread. This is desirable if you make use of a TensorFlow model on your `ServerApp` while having your `ClientApp` running on the same GPU. Without enabling this, you - might encounter an out-of-memory error becasue TensorFlow by default allocates + might encounter an out-of-memory error because TensorFlow by default allocates all GPU memory. Read mor about how `tf.config.experimental.set_memory_growth()` works in the TensorFlow documentation: https://www.tensorflow.org/api/stable. verbose_logging : bool (default: False) - When diabled, only INFO, WARNING and ERROR log messages will be shown. If + When disabled, only INFO, WARNING and ERROR log messages will be shown. If enabled, DEBUG-level logs will be displayed. """ if backend_config is None: backend_config = {} + if "init_args" not in backend_config: + backend_config["init_args"] = {} + # Set logging level logger = logging.getLogger("flwr") if verbose_logging: update_console_handler(level=DEBUG, timestamps=True, colored=True) else: - backend_config["silent"] = True + backend_config["init_args"]["logging_level"] = WARNING + backend_config["init_args"]["log_to_driver"] = True if enable_tf_gpu_growth: # Check that Backend config has also enabled using GPU growth - use_tf = backend_config.get("tensorflow", False) + use_tf = backend_config.get("actor", {}).get("tensorflow", False) if not use_tf: log(WARNING, "Enabling GPU growth for your backend.") - backend_config["tensorflow"] = True + backend_config["actor"]["tensorflow"] = True # Convert config to original JSON-stream format backend_config_stream = json.dumps(backend_config) @@ -352,7 +363,7 @@ def _run_simulation( server_app_attr, ) # Detect if there is an Asyncio event loop already running. - # If yes, run everything on a separate thread. In environmnets + # If yes, run everything on a separate thread. In environments # like Jupyter/Colab notebooks, there is an event loop present. run_in_thread = False try: @@ -364,7 +375,7 @@ def _run_simulation( run_in_thread = True except RuntimeError: - log(DEBUG, "No asyncio event loop runnig") + log(DEBUG, "No asyncio event loop running") finally: if run_in_thread: @@ -409,7 +420,8 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser: parser.add_argument( "--backend-config", type=str, - default='{"client_resources": {"num_cpus":2, "num_gpus":0.0}, "tensorflow": 0}', + default='{"client_resources": {"num_cpus":2, "num_gpus":0.0},' + '"actor": {"tensorflow": 0}}', help='A JSON formatted stream, e.g \'{"":, "":}\' to ' "configure a backend. Values supported in are those included by " "`flwr.common.typing.ConfigsRecordValues`. ",