Skip to content

Commit

Permalink
Merge pull request #146 from fanlai0990/master
Browse files Browse the repository at this point in the history
Fix wrong path variable
  • Loading branch information
fanlai0990 authored Jul 18, 2022
2 parents 4861413 + 964c213 commit 94f78e5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ If you have [Anaconda](https://www.anaconda.com/products/distribution#download-s
cd FedScale
# Please replace ~/.bashrc with ~/.bash_profile for MacOS
FEDSCALE_HOME=$(pwd)
echo export FEDSCALE_HOME=$(pwd) >> ~/.bashrc
echo alias fedscale=\'bash $FEDSCALE_HOME/fedscale.sh\' >> ~/.bashrc
conda init bash
Expand Down
8 changes: 4 additions & 4 deletions fedscale/core/aggregation/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def client_register_handler(self, executorId, info):

clientId = (
self.num_of_clients+1) if self.experiment_mode == commons.SIMULATION_MODE else executorId
self.client_manager.registerClient(
self.client_manager.register_client(
executorId, clientId, size=_size, speed=systemProfile)
self.client_manager.registerDuration(clientId, batch_size=self.args.batch_size,
upload_step=self.args.local_steps, upload_size=self.model_update_size, download_size=self.model_update_size)
Expand Down Expand Up @@ -356,7 +356,7 @@ def select_participants(self, select_num_participants, overcommitment=1.3):
list of int: The list of sampled clients id.
"""
return sorted(self.client_manager.resampleClients(
return sorted(self.client_manager.select_participants(
int(select_num_participants*overcommitment),
cur_time=self.global_virtual_clock),
)
Expand All @@ -379,7 +379,7 @@ def client_completion_handler(self, results):
self.stats_util_accumulator.append(results['utility'])
self.loss_accumulator.append(results['moving_loss'])

self.client_manager.registerScore(results['clientId'], results['utility'],
self.client_manager.register_feedback(results['clientId'], results['utility'],
auxi=math.sqrt(
results['moving_loss']),
time_stamp=self.round,
Expand Down Expand Up @@ -508,7 +508,7 @@ def round_completion_handler(self):
max(1, len(self.stats_util_accumulator))
# assign avg reward to explored, but not ran workers
for clientId in self.round_stragglers:
self.client_manager.registerScore(clientId, avgUtilLastround,
self.client_manager.register_feedback(clientId, avgUtilLastround,
time_stamp=self.round,
duration=self.virtual_client_clock[clientId]['computation'] +
self.virtual_client_clock[clientId]['communication'],
Expand Down
51 changes: 45 additions & 6 deletions fedscale/core/client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import math
import pickle
from random import Random
from typing import Dict, List

from fedscale.core.internal.client import Client

Expand Down Expand Up @@ -41,7 +42,19 @@ def __init__(self, mode, args, sample_seed=233):
self.user_trace_keys = list(self.user_trace.keys())

def registerClient(self, hostId, clientId, size, speed, duration=1):
self.register_client(hostId, clientId, size, speed, duration)

def register_client(self, hostId: int, clientId: int, size: int, speed: Dict[str, float], duration: float=1) -> None:
"""Register client information to the client manager.
Args:
hostId (int): executor Id.
clientId (int): client Id.
size (int): number of samples on this client.
speed (Dict[str, float]): device speed (e.g., compuutation and communication).
duration (float): execution latency.
"""
uniqueId = self.getUniqueId(hostId, clientId)
user_trace = None if self.user_trace is None else self.user_trace[self.user_trace_keys[int(
clientId) % len(self.user_trace)]]
Expand All @@ -60,7 +73,7 @@ def registerClient(self, hostId, clientId, size, speed, duration=1):
self.ucbSampler.register_client(clientId, feedbacks=feedbacks)
else:
del self.Clients[uniqueId]

def getAllClients(self):
return self.feasibleClients

Expand All @@ -80,7 +93,6 @@ def registerDuration(self, clientId, batch_size, upload_step, upload_size, downl
clientId, exe_cost['computation']+exe_cost['communication'])

def getCompletionTime(self, clientId, batch_size, upload_step, upload_size, download_size):

return self.Clients[self.getUniqueId(0, clientId)].getCompletionTime(
batch_size=batch_size, upload_step=upload_step,
upload_size=upload_size, download_size=download_size
Expand All @@ -91,6 +103,20 @@ def registerSpeed(self, hostId, clientId, speed):
self.Clients[uniqueId].speed = speed

def registerScore(self, clientId, reward, auxi=1.0, time_stamp=0, duration=1., success=True):
self.register_feedback(clientId, reward, auxi=auxi, time_stamp=time_stamp, duration=duration, success=success)

def register_feedback(self, clientId: int, reward: float, auxi: float=1.0, time_stamp: float=0, duration: float=1., success: bool=True) -> None:
"""Collect client execution feedbacks of last round.
Args:
clientId (int): client Id.
reward (float): execution utilities (processed feedbacks).
auxi (float): unprocessed feedbacks.
time_stamp (float): current wall clock time.
duration (float): system execution duration.
success (bool): whether this client runs successfully.
"""
# currently, we only use distance as reward
if self.mode == "oort":
feedbacks = {
Expand Down Expand Up @@ -180,27 +206,40 @@ def getFeasibleClients(self, cur_time):
def isClientActive(self, clientId, cur_time):
return self.Clients[self.getUniqueId(0, clientId)].isActive(cur_time)

def resampleClients(self, numOfClients, cur_time=0):
def select_participants(self, num_of_clients: int, cur_time: float=0) -> List[int]:
"""Select participating clients for current execution task.
Args:
num_of_clients (int): number of participants to select.
cur_time (float): current wall clock time.
Returns:
List[int]: indices of selected clients.
"""
self.count += 1

clients_online = self.getFeasibleClients(cur_time)

if len(clients_online) <= numOfClients:
if len(clients_online) <= num_of_clients:
return clients_online

pickled_clients = None
clients_online_set = set(clients_online)

if self.mode == "oort" and self.count > 1:
pickled_clients = self.ucbSampler.select_participant(
numOfClients, feasible_clients=clients_online_set)
num_of_clients, feasible_clients=clients_online_set)
else:
self.rng.shuffle(clients_online)
client_len = min(numOfClients, len(clients_online) - 1)
client_len = min(num_of_clients, len(clients_online) - 1)
pickled_clients = clients_online[:client_len]

return pickled_clients

def resampleClients(self, numOfClients, cur_time=0):
return self.select_participants(numOfClients, cur_time)

def getAllMetrics(self):
if self.mode == "oort":
return self.ucbSampler.getAllMetrics()
Expand Down
3 changes: 2 additions & 1 deletion install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/bash
FEDSCALE_HOME=$(pwd)
echo export FEDSCALE_HOME=$(pwd) >> ~/.bashrc
echo alias fedscale=\'bash ${FEDSCALE_HOME}/fedscale.sh\' >> ~/.bashrc
echo alias fedscale=\'bash ${FEDSCALE_HOME}/fedscale.sh\' >> ~/.bashrc

isPackageNotInstalled() {
$1 --version &> /dev/null
Expand Down

0 comments on commit 94f78e5

Please sign in to comment.