Skip to content

Commit

Permalink
Add RayBackend and SimpleActorPool (#2997)
Browse files Browse the repository at this point in the history
  • Loading branch information
jafermarq authored Feb 23, 2024
1 parent 334a9ff commit de5af24
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 4 deletions.
26 changes: 26 additions & 0 deletions src/py/flwr/server/superlink/fleet/vce/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,34 @@
# ==============================================================================
"""Simulation Engine Backends."""

import importlib
from typing import Dict, Type

from .backend import Backend, BackendConfig

is_ray_installed = importlib.util.find_spec("ray") is not None

# Mapping of supported backends
supported_backends: Dict[str, Type[Backend]] = {}

# To log backend-specific error message when chosen backend isn't available
error_messages_backends: Dict[str, str] = {}

if is_ray_installed:
from .raybackend import RayBackend

supported_backends["ray"] = RayBackend
else:
error_messages_backends[
"ray"
] = """Unable to import module `ray`.
To install the necessary dependencies, install `flwr` with the `simulation` extra:
pip install -U flwr["simulation"]
"""


__all__ = [
"Backend",
"BackendConfig",
Expand Down
3 changes: 3 additions & 0 deletions src/py/flwr/server/superlink/fleet/vce/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
class Backend(ABC):
"""Abstract base class for a Simulation Engine Backend."""

def __init__(self, backend_config: BackendConfig, work_dir: str) -> None:
"""Construct a backend."""

@abstractmethod
async def build(self) -> None:
"""Build backend asynchronously.
Expand Down
153 changes: 153 additions & 0 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Ray backend for the Fleet API using the Simulation Engine."""

import asyncio
import pathlib
from logging import INFO
from typing import Callable, Dict, List, Tuple, Union

from flwr.client.clientapp import ClientApp
from flwr.common.context import Context
from flwr.common.logger import log
from flwr.common.message import Message
from flwr.simulation.ray_transport.ray_actor import (
BasicActorPool,
ClientAppActor,
init_ray,
)

from .backend import Backend, BackendConfig

ClientResourcesDict = Dict[str, Union[int, float]]


class RayBackend(Backend):
"""A backend that submits jobs to a `BasicActorPool`."""

def __init__(
self,
backend_config: BackendConfig,
work_dir: str,
) -> None:
"""Prepare RayBackend by initialising Ray and creating the ActorPool."""
log(INFO, "Initialising: %s", self.__class__.__name__)
log(INFO, "Backend config: %s", backend_config)

# Init ray and append working dir if needed
runtime_env = (
self._configure_runtime_env(work_dir=work_dir) if work_dir else None
)
init_ray(runtime_env=runtime_env)

# Validate client resources
self.client_resources_key = "client_resources"

# Create actor pool
actor_kwargs = backend_config.get("actor_kwargs", {})
client_resources = self._validate_client_resources(config=backend_config)
self.pool = BasicActorPool(
actor_type=ClientAppActor,
client_resources=client_resources,
actor_kwargs=actor_kwargs,
)

def _configure_runtime_env(self, work_dir: str) -> Dict[str, Union[str, List[str]]]:
"""Return list of files/subdirectories to exclude relative to work_dir.
Without this, Ray will push everything to the Ray Cluster.
"""
runtime_env: Dict[str, Union[str, List[str]]] = {"working_dir": work_dir}

excludes = []
path = pathlib.Path(work_dir)
for p in path.rglob("*"):
# Exclude files need to be relative to the working_dir
if p.is_file() and not str(p).endswith(".py"):
excludes.append(str(p.relative_to(path)))
runtime_env["excludes"] = excludes

return runtime_env

def _validate_client_resources(self, config: BackendConfig) -> ClientResourcesDict:
client_resources_config = config.get(self.client_resources_key)
client_resources: ClientResourcesDict = {}
valid_types = (int, float)
if client_resources_config:
for k, v in client_resources_config.items():
if not isinstance(k, str):
raise ValueError(
f"client resources keys are expected to be `str` but you used "
f"{type(k)} for `{k}`"
)
if not isinstance(v, valid_types):
raise ValueError(
f"client resources are expected to be of type {valid_types} "
f"but found `{type(v)}` for key `{k}`",
)
client_resources[k] = v

else:
client_resources = {"num_cpus": 2, "num_gpus": 0.0}
log(
INFO,
"`%s` not specified in backend config. Applying default setting: %s",
self.client_resources_key,
client_resources,
)

return client_resources

@property
def num_workers(self) -> int:
"""Return number of actors in pool."""
return self.pool.num_actors

def is_worker_idle(self) -> bool:
"""Report whether the pool has idle actors."""
return self.pool.is_actor_available()

async def build(self) -> None:
"""Build pool of Ray actors that this backend will submit jobs to."""
await self.pool.add_actors_to_pool(self.pool.actors_capacity)
log(INFO, "Constructed ActorPool with: %i actors", self.pool.num_actors)

async def process_message(
self,
app: Callable[[], ClientApp],
message: Message,
context: Context,
) -> Tuple[Message, Context]:
"""Run ClientApp that process a given message.
Return output message and updated context.
"""
node_id = message.metadata.dst_node_id

# Submite a task to the pool
future = await self.pool.submit(
lambda a, a_fn, mssg, cid, state: a.run.remote(a_fn, mssg, cid, state),
(app, message, str(node_id), context),
)

await asyncio.wait([future])

# Fetch result
(
out_mssg,
updated_context,
) = await self.pool.fetch_result_and_return_actor_to_pool(future)

return out_mssg, updated_context
25 changes: 22 additions & 3 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
"""Fleet VirtualClientEngine API."""

import json
from logging import INFO
from logging import ERROR, INFO
from typing import Dict

from flwr.client.clientapp import ClientApp, load_client_app
from flwr.client.node_state import NodeState
from flwr.common.logger import log
from flwr.server.superlink.state import StateFactory

from .backend import error_messages_backends, supported_backends

NodeToPartitionMapping = Dict[int, int]


Expand Down Expand Up @@ -60,9 +62,26 @@ def start_vce(
node_states[node_id] = NodeState()

# Load backend config
_ = json.loads(backend_config_json_stream)
log(INFO, "Supported backends: %s", list(supported_backends.keys()))
backend_config = json.loads(backend_config_json_stream)

try:
backend_type = supported_backends[backend_name]
_ = backend_type(backend_config, work_dir=working_dir)
except KeyError as ex:
log(
ERROR,
"Backend `%s`, is not supported. Use any of %s or add support "
"for a new backend.",
backend_name,
list(supported_backends.keys()),
)
if backend_name in error_messages_backends:
log(ERROR, error_messages_backends[backend_name])

raise ex

log(INFO, "client_app_str = %s", client_app_module_name)
log(INFO, "client_app_module_name = %s", client_app_module_name)

def _load() -> ClientApp:
app: ClientApp = load_client_app(client_app_module_name)
Expand Down
77 changes: 76 additions & 1 deletion src/py/flwr/simulation/ray_transport/ray_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# ==============================================================================
"""Ray-based Flower Actor and ActorPool implementation."""


import asyncio
import threading
import traceback
from abc import ABC
Expand Down Expand Up @@ -414,3 +414,78 @@ def get_client_result(
# Fetch result belonging to the VirtualClient calling this method
# Return both result from tasks and (potentially) updated run context
return self._fetch_future_result(cid)


def init_ray(*args: Any, **kwargs: Any) -> None:
"""Intialises Ray if not already initialised."""
if not ray.is_initialized():
ray.init(*args, **kwargs)


class BasicActorPool:
"""A basic actor pool."""

def __init__(
self,
actor_type: Type[VirtualClientEngineActor],
client_resources: Dict[str, Union[int, float]],
actor_kwargs: Dict[str, Any],
):
self.client_resources = client_resources

# Queue of idle actors
self.pool: "asyncio.Queue[Type[VirtualClientEngineActor]]" = asyncio.Queue()
self.num_actors = 0

# Resolve arguments to pass during actor init
actor_args = {} if actor_kwargs is None else actor_kwargs

# A function that creates an actor
self.create_actor_fn = lambda: actor_type.options( # type: ignore
**client_resources
).remote(**actor_args)

# Figure out how many actors can be created given the cluster resources
# and the resources the user indicates each VirtualClient will need
self.actors_capacity = pool_size_from_resources(client_resources)
self._future_to_actor: Dict[Any, Type[VirtualClientEngineActor]] = {}

def is_actor_available(self) -> bool:
"""Return true if there is an idle actor."""
return self.pool.qsize() > 0

async def add_actors_to_pool(self, num_actors: int) -> None:
"""Add actors to the pool.
This method may be executed also if new resources are added to your Ray cluster
(e.g. you add a new node).
"""
for _ in range(num_actors):
await self.pool.put(self.create_actor_fn()) # type: ignore
self.num_actors += num_actors

async def submit(
self, actor_fn: Any, job: Tuple[ClientAppFn, Message, str, Context]
) -> Any:
"""On idle actor, submit job and return future."""
# Remove idle actor from pool
actor = await self.pool.get()
# Submit job to actor
app_fn, mssg, cid, context = job
future = actor_fn(actor, app_fn, mssg, cid, context)
# Keep track of future:actor (so we can fetch the actor upon job completion
# and add it back to the pool)
self._future_to_actor[future] = actor
return future

async def fetch_result_and_return_actor_to_pool(
self, future: Any
) -> Tuple[Message, Context]:
"""Pull result given a future and add actor back to pool."""
# Get actor that ran job
actor = self._future_to_actor.pop(future)
await self.pool.put(actor)
# Retrieve result for object store
# Instead of doing ray.get(future) we await it
_, out_mssg, updated_context = await future
return out_mssg, updated_context

0 comments on commit de5af24

Please sign in to comment.