Skip to content

Commit

Permalink
refactor(framework) Handle unsuitable resources for simulation (#4143)
Browse files Browse the repository at this point in the history
  • Loading branch information
jafermarq authored Sep 11, 2024
1 parent 5dffa05 commit 0613593
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
33 changes: 21 additions & 12 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@ def __init__(

# Validate client resources
self.client_resources_key = "client_resources"
client_resources = self._validate_client_resources(config=backend_config)
self.client_resources = self._validate_client_resources(config=backend_config)

# Create actor pool
actor_kwargs = self._validate_actor_arguments(config=backend_config)

self.pool = BasicActorPool(
actor_type=ClientAppActor,
client_resources=client_resources,
actor_kwargs=actor_kwargs,
)
# Valide actor resources
self.actor_kwargs = self._validate_actor_arguments(config=backend_config)
self.pool: Optional[BasicActorPool] = None

self.app_fn: Optional[Callable[[], ClientApp]] = None

Expand Down Expand Up @@ -122,14 +117,24 @@ def init_ray(self, backend_config: BackendConfig) -> None:
@property
def num_workers(self) -> int:
"""Return number of actors in pool."""
return self.pool.num_actors
return self.pool.num_actors if self.pool else 0

def is_worker_idle(self) -> bool:
"""Report whether the pool has idle actors."""
return self.pool.is_actor_available()
return self.pool.is_actor_available() if self.pool else False

def build(self, app_fn: Callable[[], ClientApp]) -> None:
"""Build pool of Ray actors that this backend will submit jobs to."""
# Create Actor Pool
try:
self.pool = BasicActorPool(
actor_type=ClientAppActor,
client_resources=self.client_resources,
actor_kwargs=self.actor_kwargs,
)
except Exception as ex:
raise ex

self.pool.add_actors_to_pool(self.pool.actors_capacity)
# Set ClientApp callable that ray actors will use
self.app_fn = app_fn
Expand All @@ -146,6 +151,9 @@ def process_message(
"""
partition_id = context.node_config[PARTITION_ID_KEY]

if self.pool is None:
raise ValueError("The actor pool is empty, unfit to process messages.")

if self.app_fn is None:
raise ValueError(
"Unspecified function to load a `ClientApp`. "
Expand Down Expand Up @@ -179,6 +187,7 @@ def process_message(

def terminate(self) -> None:
"""Terminate all actors in actor pool."""
self.pool.terminate_all_actors()
if self.pool:
self.pool.terminate_all_actors()
ray.shutdown()
log(DEBUG, "Terminated %s", self.__class__.__name__)
4 changes: 2 additions & 2 deletions src/py/flwr/simulation/ray_transport/ray_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ def pool_size_from_resources(client_resources: Dict[str, Union[int, float]]) ->
WARNING,
"The ActorPool is empty. The system (CPUs=%s, GPUs=%s) "
"does not meet the criteria to host at least one client with resources:"
" %s. Lowering the `client_resources` could help.",
" %s. Lowering these resources could help.",
num_cpus,
num_gpus,
client_resources,
)
raise ValueError(
"ActorPool is empty. Stopping Simulation. "
"Check 'client_resources' passed to `start_simulation`"
"Check `num_cpus` and/or `num_gpus` passed to the simulation engine"
)

return total_num_actors
Expand Down

0 comments on commit 0613593

Please sign in to comment.