Skip to content

Commit

Permalink
simplifying strategy; load data once; bumped flwr 1.6; other minor ch…
Browse files Browse the repository at this point in the history
…anges
  • Loading branch information
jafermarq committed Dec 21, 2023
1 parent 081e833 commit 1ac0b87
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 132 deletions.
28 changes: 16 additions & 12 deletions examples/flower-via-docker-compose /client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
parser.add_argument('--learning_rate', type=float, default=0.1, help="Learning rate for the optimizer")
parser.add_argument('--client_id', type=int, default=1, help="Unique ID for the client")
parser.add_argument('--total_clients', type=int, default=2, help="Total number of clients")
parser.add_argument('--data_percentage', type=float, default=0.5, help='Portion of client data to use')

args = parser.parse_args()

Expand All @@ -34,6 +35,15 @@ class Client(fl.client.NumPyClient):
def __init__(self, args):
self.args = args

logger.info("Preparing data...")
train_d, test_d = load_data(data_sampling_percentage=self.args.data_percentage,
batch_size=self.args.batch_size,
client_id=self.args.client_id,
total_clients=self.args.total_clients)

self.train_dataset = train_d
self.test_dataset = test_d

def get_parameters(self, config):
# Return the parameters of the model
return model.get_model().get_weights()
Expand All @@ -43,12 +53,9 @@ def fit(self, parameters, config):

# Set the weights of the model
model.get_model().set_weights(parameters)

# Load the training dataset and get the number of examples
train_dataset, _, num_examples_train, _ = load_data(batch_size=self.args.batch_size,client_id=self.args.client_id,total_clients=self.args.total_clients)

# Train the model
history = model.get_model().fit(train_dataset)
history = model.get_model().fit(self.train_dataset)

# Calculate evaluation metric
results = {
Expand All @@ -59,29 +66,26 @@ def fit(self, parameters, config):
parameters_prime = model.get_model().get_weights()

# Directly return the parameters and the number of examples trained on
return parameters_prime, num_examples_train, results

return parameters_prime, len(self.train_dataset), results


def evaluate(self, parameters, config):

# Set the weights of the model
model.get_model().set_weights(parameters)

# Use the test dataset for evaluation
_, test_dataset, _, num_examples_test = load_data(batch_size=self.args.batch_size)

# Evaluate the model and get the loss and accuracy
loss, accuracy = model.get_model().evaluate(test_dataset)
loss, accuracy = model.get_model().evaluate(self.test_dataset)

# Return the loss, the number of examples evaluated on and the accuracy
return float(loss), num_examples_test, {"accuracy": float(accuracy)}
return float(loss), len(self.test_dataset), {"accuracy": float(accuracy)}


# Function to Start the Client
def start_fl_client():
try:
fl.client.start_numpy_client(server_address=args.server_address, client=Client(args))
client = Client(args).to_client()
fl.client.start_client(server_address=args.server_address, client=client)
except Exception as e:
logger.error("Error starting FL client: %s", e)
return {"status": "error", "message": str(e)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@
"type": "table"
}
],
"refresh": "",
"refresh": "auto",
"schemaVersion": 38,
"tags": [],
"templating": {
Expand Down
13 changes: 5 additions & 8 deletions examples/flower-via-docker-compose /helpers/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ def load_data(data_sampling_percentage=0.0005,batch_size=32,client_id=1,total_cl
batch_size (int): Batch size for training and evaluation.
client_id (int): Unique ID for the client.
total_clients (int): Total number of clients.
data_sampling_percentage (float): Percentage of the dataset to use for training and evaluation.
data_sampling_percentage (float): Percentage of the dataset to use for training.
Returns:
Tuple of TensorFlow datasets for training and evaluation,
and the number of samples in each dataset.
Tuple of TensorFlow datasets for training and evaluation.
"""

logger.info("Loaded federated dataset partition for client %s", client_id)
Expand All @@ -38,13 +37,12 @@ def load_data(data_sampling_percentage=0.0005,batch_size=32,client_id=1,total_cl
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))

# Calculate subset size for train and test datasets
# Calculate subset size for train dataset
train_subset_size = int(len(x_train) * data_sampling_percentage)
test_subset_size = int(len(x_test) * data_sampling_percentage)

# Shuffle and subset data
train_dataset = train_dataset.shuffle(buffer_size=len(x_train)).take(train_subset_size)
test_dataset = test_dataset.shuffle(buffer_size=len(x_test)).take(test_subset_size)
test_dataset = test_dataset.shuffle(buffer_size=len(x_test))

# Batch data
train_dataset = train_dataset.batch(batch_size)
Expand All @@ -56,6 +54,5 @@ def load_data(data_sampling_percentage=0.0005,batch_size=32,client_id=1,total_cl

logger.info("Created data generators with batch size: %s", batch_size)
logger.info("Created data generators with train_subset_size: %s", train_subset_size)
logger.info("Created data generators with test_subset_size: %s", test_subset_size)

return train_dataset, test_dataset, train_subset_size, test_subset_size
return train_dataset, test_dataset
2 changes: 1 addition & 1 deletion examples/flower-via-docker-compose /model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Class for the model. In this case, we are using the MobileNetV2 model from Keras
class Model():
def __init__(self, learning_rate=0.01):
def __init__(self, learning_rate):
self.learning_rate = learning_rate
self.loss_function = tf.keras.losses.SparseCategoricalCrossentropy()
self.model = tf.keras.applications.MobileNetV2((32, 32, 3), alpha=0.1, classes=10, weights=None)
Expand Down
2 changes: 1 addition & 1 deletion examples/flower-via-docker-compose /requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
flwr==1.5.0
flwr==1.6.0
tensorflow==2.13.1
psutil==5.9.6
ping3==4.0.4
Expand Down
6 changes: 4 additions & 2 deletions examples/flower-via-docker-compose /server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def start_fl_server(strategy, rounds):
start_http_server(8000)

# Initialize Strategy Instance and Start FL Server
strategy_instance = FedCustom(accuracy_gauge = accuracy_gauge, loss_gauge = loss_gauge, total_clients = args.total_clients)
start_fl_server(strategy_instance, args.number_of_rounds)
strategy_instance = FedCustom(accuracy_gauge=accuracy_gauge,
loss_gauge=loss_gauge,
total_clients=args.total_clients)
start_fl_server(strategy=strategy_instance, rounds=args.number_of_rounds)

115 changes: 8 additions & 107 deletions examples/flower-via-docker-compose /strategy/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,95 +20,21 @@
logging.basicConfig(level=logging.INFO) # Configure logging
logger = logging.getLogger(__name__) # Create logger for the module


class FedCustom(fl.server.strategy.Strategy):
def __init__(
self,
fraction_fit: float = 1,
fraction_evaluate: float = 1,
min_fit_clients: int = 2,
min_evaluate_clients: int = 2,
min_available_clients: int = 2,
initial_parameters: Optional[Parameters] = None,
total_clients: int = 2,

class FedCustom(fl.server.strategy.FedAvg):
def __init__(self, total_clients: int = 2,
accuracy_gauge: Gauge = None,
loss_gauge: Gauge = None,
) -> None:
super().__init__()
self.fraction_fit = fraction_fit
self.fraction_evaluate = fraction_evaluate
self.min_fit_clients = min_fit_clients
self.min_evaluate_clients = min_evaluate_clients
self.min_available_clients = min_available_clients
self.initial_parameters = initial_parameters
loss_gauge: Gauge = None,*args, **kwargs):

super().__init__(*args, **kwargs)

self.total_clients = total_clients
self.accuracy_gauge = accuracy_gauge
self.loss_gauge = loss_gauge

def __repr__(self) -> str:
return "FedCustom"

def initialize_parameters(
self, client_manager: ClientManager
) -> Optional[Parameters]:
"""Initialize global model parameters."""
initial_parameters = self.initial_parameters
self.initial_parameters = None
return initial_parameters


def configure_fit(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, FitIns]]:
"""Configure the next round of training."""
logger.info(f"Configuring fit for server round {server_round}.")

sample_size, min_num_clients = self.num_fit_clients(client_manager.num_available(), server_round)
clients = client_manager.sample(
num_clients=sample_size, min_num_clients=min_num_clients
)

fit_configurations = [
(client, FitIns(parameters, {}))
for idx, client in enumerate(clients)
]

return fit_configurations


def aggregate_fit(
self,
server_round: int,
results: List[Tuple[ClientProxy, FitRes]],
failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
"""Aggregate fit results using weighted average."""
logger.info(f"Aggregating fit results for server round {server_round}.")
weights_results = [
(parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
for _, fit_res in results
]
parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results))
metrics_aggregated = {}
return parameters_aggregated, metrics_aggregated



def configure_evaluate(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, EvaluateIns]]:
"""Configure the next round of evaluation."""
if self.fraction_evaluate == 0.0:
return []
evaluate_ins = EvaluateIns(parameters, {})
sample_size, min_num_clients = self.num_evaluation_clients(
client_manager.num_available(), server_round
)
clients = client_manager.sample(
num_clients=sample_size, min_num_clients=min_num_clients
)
return [(client, evaluate_ins) for client in clients]

def aggregate_evaluate(
self,
server_round: int,
Expand All @@ -119,8 +45,6 @@ def aggregate_evaluate(

if not results:
return None, {}

logger.info(f"Aggregating evaluation results for server round {server_round} are {results}.")

# Calculate weighted average for loss using the provided function
loss_aggregated = weighted_loss_avg(
Expand All @@ -142,27 +66,4 @@ def aggregate_evaluate(
}

return loss_aggregated, metrics_aggregated

def evaluate(
self, server_round: int, parameters: Parameters
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
"""Evaluate global model parameters using an evaluation function."""
logger.info(f"Evaluating model for server round {server_round}.")
return None

def num_fit_clients(self, num_available_clients: int, server_round: int) -> Tuple[int, int]:
"""Return sample size and required number of clients."""
min_clients = self.min_available_clients if server_round > 1 else self.total_clients
num_clients = int(num_available_clients * self.fraction_fit)
return max(num_clients, self.min_fit_clients), min_clients


def num_evaluation_clients(self, num_available_clients: int,server_round:int) -> Tuple[int, int]:
"""Use a fraction of available clients for evaluation."""
min_clients = self.min_available_clients if server_round > 1 else self.total_clients
num_clients = int(num_available_clients * self.fraction_evaluate)
return max(num_clients, self.min_evaluate_clients), min_clients





0 comments on commit 1ac0b87

Please sign in to comment.