diff --git a/README.md b/README.md index 8e32fcde22a0..17a96ddab527 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,7 @@ Flower Baselines is a collection of community-contributed projects that reproduc - [MOON](https://github.com/adap/flower/tree/main/baselines/moon) - [niid-Bench](https://github.com/adap/flower/tree/main/baselines/niid_bench) - [TAMUNA](https://github.com/adap/flower/tree/main/baselines/tamuna) +- [FedPara](https://github.com/adap/flower/tree/main/baselines/fedpara) - [FedAvg](https://github.com/adap/flower/tree/main/baselines/flwr_baselines/flwr_baselines/publications/fedavg_mnist) - [FedOpt](https://github.com/adap/flower/tree/main/baselines/flwr_baselines/flwr_baselines/publications/adaptive_federated_optimization) diff --git a/baselines/fedpara/.gitignore b/baselines/fedpara/.gitignore new file mode 100644 index 000000000000..6244dfada6ee --- /dev/null +++ b/baselines/fedpara/.gitignore @@ -0,0 +1,4 @@ +outputs/ +multirun/ +client_states/ +data/ \ No newline at end of file diff --git a/baselines/fedpara/LICENSE b/baselines/fedpara/LICENSE new file mode 100644 index 000000000000..d64569567334 --- /dev/null +++ b/baselines/fedpara/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/baselines/fedpara/README.md b/baselines/fedpara/README.md new file mode 100644 index 000000000000..068366aa261c --- /dev/null +++ b/baselines/fedpara/README.md @@ -0,0 +1,185 @@ +--- +title: "FedPara: Low-rank Hadamard Product for Communication-Efficient Federated Learning" +url: https://openreview.net/forum?id=d71n4ftoCBy +labels: [image classification, personalization, low-rank training, tensor decomposition] +dataset: [CIFAR-10, CIFAR-100, MNIST] +--- + +# FedPara: Low-rank Hadamard Product for Communication-Efficient Federated Learning + +> Note: If you use this baseline in your work, please remember to cite the original authors of the paper as well as the Flower paper. + +**Paper:** [openreview.net/forum?id=d71n4ftoCBy](https://openreview.net/forum?id=d71n4ftoCBy) + +**Authors:** Nam Hyeon-Woo, Moon Ye-Bin, Tae-Hyun Oh + +**Abstract:** In this work, we propose a communication-efficient parameterization, FedPara, +for federated learning (FL) to overcome the burdens on frequent model uploads +and downloads. Our method re-parameterizes weight parameters of layers using +low-rank weights followed by the Hadamard product. Compared to the conventional low-rank parameterization, our FedPara method is not restricted to lowrank constraints, and thereby it has a far larger capacity. This property enables to +achieve comparable performance while requiring 3 to 10 times lower communication costs than the model with the original layers, which is not achievable by +the traditional low-rank methods. The efficiency of our method can be further improved by combining with other efficient FL optimizers. In addition, we extend +our method to a personalized FL application, pFedPara, which separates parameters into global and local ones. We show that pFedPara outperforms competing +personalized FL methods with more than three times fewer parameters. Project +page: https://github.com/South-hw/FedPara_ICLR22 + + + +## About this baseline + +**What’s implemented:** The code in this directory replicates the experiments in FedPara paper implementing the Low-rank scheme for Convolution module. + +Specifically, it replicates the results for CIFAR-10 and CIFAR-100 in Figure 3 and the results for MNIST in Figure 5(c). + + +**Datasets:** CIFAR-10, CIFAR-100, MNIST from PyTorch's Torchvision + +**Hardware Setup:** The experiments have been conducted on our server with the following specs: + +- **GPU:** 1x RTX A6000 GPU with 48GB VRAM +- **CPU:** 1x24 cores Intel Xeon(R) 6248R +- **RAM:** 150 GB + +On a machine with RTX 3090Ti (24GB VRAM) it takes approximately 1h to run each CIFAR-10/100 experiment while using < 12GB of VRAM. You can lower the VRAM footprint my reducing the number of clients allowed to run in parallel in your GPU (do this by raising `client_resources.num_gpus`). + + +**Contributors:** Yahia Salaheldin Shaaban, Omar Mokhtar and Roeia Amr + + +## Experimental Setup + +**Task:** Image classification + +**Model:** This baseline implements VGG16 with group normalization. + +**Dataset:** + +| Dataset | #classes | #partitions | partitioning method IID | partitioning method non-IID | +|:---------|:--------:|:-----------:|:----------------------:| :----------------------:| +| CIFAR-10 | 10 | 100 | random split | Dirichlet distribution ($\alpha=0.5$)| +| CIFAR-100 | 100 | 50 | random split| Dirichlet distribution ($\alpha=0.5$)| + + +**Training Hyperparameters:** + +| | Cifar10 IID | Cifar10 Non-IID | Cifar100 IID | Cifar100 Non-IID | MNIST | +|---|-------|-------|------|-------|----------| +| Fraction of client (K) | 16 | 16 | 8 | 8 | 10 | +| Total rounds (T) | 200 | 200 | 400 | 400 | 100 | +| Number of SGD epochs (E) | 10 | 5 | 10 | 5 | 5 | +| Batch size (B) | 64 | 64 | 64 | 64 | 10 | +| Initial learning rate (η) | 0.1 | 0.1 | 0.1 | 0.1 | 0.1-0.01 | +| Learning rate decay (τ) | 0.992 | 0.992 | 0.992| 0.992 | 0.999 | +| Regularization coefficient (λ) | 1 | 1 | 1 | 1 | 0 | + +As for the parameters ratio ($\gamma$) we use the following model sizes. As in the paper, $\gamma=0.1$ is used for CIFAR-10 and $\gamma=0.4$ for CIFAR-100: + +| Parameters ratio ($\gamma$) | CIFAR-10 | CIFAR-100 | +|----------|--------|--------| +| 1.0 (original) | 15.25M | 15.30M | +| 0.1 | 1.55M | - | +| 0.4 | - | 4.53M | + + +### Notes: +- Notably, Fedpara's low-rank training technique heavily relies on initialization, with our experiments revealing that employing a 'Fan-in' He initialization (or Kaiming) renders the model incapable of convergence, resulting in a performance akin to that of a random classifier. We found that only Fan-out initialization yielded the anticipated results, and we postulated that this is attributed to the variance conservation during backward propagation. + +- The paper lacks explicit guidance on calculating the rank, aside from the "Rank_min - Rank_max" equation. To address this, we devised an equation aligning with the literature's explanation and constraint, solving a quadratic equation to determine max_rank and utilizing proposition 2 from the paper to establish min_rank. + +- The Jacobian correction was not incorporated into our implementation, primarily due to the lack of explicit instructions in the paper regarding the specific implementation of the dual update principle mentioned in the Jacobian correction section. + +- It was observed that data generation is crutial for model convergence + +## Environment Setup +To construct the Python environment follow these steps: + +It is assumed that `pyenv` is installed, `poetry` is installed and python 3.10.6 is installed using `pyenv`. Refer to this [documentation](https://flower.dev/docs/baselines/how-to-usef-baselines.html#setting-up-your-machine) to ensure that your machine is ready. + +```bash +# Set Python 3.10 +pyenv local 3.10.6 +# Tell poetry to use python 3.10 +poetry env use 3.10.6 + +# Install the base Poetry environment +poetry install + +# Activate the environment +poetry shell +``` + +## Running the Experiments + +Running `FedPara` is easy. You can run it with default parameters directly or by tweaking them directly on the command line. Some command examples are shown below. + +```bash +# To run fedpara with default parameters +python -m fedpara.main + +# Run for more rounds and a different number of local epochs +python -m fedpara.main num_rounds=2024 num_epochs=1 + +# Choose parameterization scheme: lowrank or original (normal weights) +python -m fedpara.main model.param_type=standard # or lowrank (default) + +# Choosing between non IID and IID scheme +python -m fedpara.main dataset_config.partition=iid # or non-iid (default) + +# Choosing the ratio (lambda) of number of parameters to communicate +python -m fedpara.main model.ratio=0.1 + +# Choosing the CIFAR-100 config +python -m fedpara.main --config-name cifar100 # change settings as shown above if desired +``` + +## Expected Results + +To reproduce the curves shown below (which correspond to those in Figure 3 in the paper), run the following commands. Experiments running with `model.param_type=lowrank` correspond to those with `-FedPara` in the legend of the figures below. Those with `model.param_type=standard` are labelled with the `-orig` (as original) tag. + +```bash +# To run fedpara for non-iid CIFAR-10 on vgg16 for lowrank and original schemes +python -m fedpara.main --multirun model.param_type=standard,lowrank +# To run fedpara for non-iid CIFAR-100 on vgg16 for lowrank and original schemes +python -m fedpara.main --config-name cifar100 --multirun model.param_type=standard,lowrank +# To run fedpara for iid CIFAR-10 on vgg16 for lowrank and original schemes +python -m fedpara.main --multirun model.param_type=standard,lowrank num_epochs=10 dataset_config.partition=iid +# To run fedpara for iid CIFAR-100 on vgg16 for lowrank and original schemes +python -m fedpara.main --config-name cifar100 --multirun model.param_type=standard,lowrank num_epochs=10 dataset_config.partition=iid +# To run fedavg for non-iid MINST on FC +python -m fedpara.main --config-name mnist_fedavg +# To run fedper for non-iid MINST on FC +python -m fedpara.main --config-name mnist_fedper +# To run pfedpara for non-iid MINST on FC +python -m fedpara.main --config-name mnist_pfedpara +``` + +#### Communication Cost: +Communication costs as measured as described in the paper: +*"FL evaluation typically measures the required rounds to achieve the target accuracy as communication costs, but we instead assess total transferred bit sizes, 2 × +(#participants)×(model size)×(#rounds)"* + + +### CIFAR-100 (Accuracy vs Communication Cost) + +| IID | Non-IID | +|:----:|:----:| +|![Cifar100 iid](_static/Cifar100_iid.jpeg) | ![Cifar100 non-iid](_static/Cifar100_noniid.jpeg) | + + +### CIFAR-10 (Accuracy vs Communication Cost) + +| IID | Non-IID | +|:----:|:----:| +|![CIFAR10 iid](_static/Cifar10_iid.jpeg) | ![CIFAR10 non-iid](_static/Cifar10_noniid.jpeg) | + +### NON-IID MINST (FedAvg vs FedPer vs pFedPara) + +The only federated averaging (FedAvg) implementation replicates the results outlined in the paper. However, challenges with convergence were encountered when applying `pFedPara` and `FedPer` methods. + +![Personalization algorithms](_static/non-iid_mnist_personalization.png) + +## Code Acknowledgments +Our code is inspired from these repos: +- [Fedpara low rank tensor CNN class structure](https://github.com/South-hw/FedPara_ICLR22) +- [Non-IID mnist data preparation](https://github.com/nimeshagrawal/FedAvg-Pytorch) +- [Cifar non IID data generation](https://github.com/guobbin/PFL-MoE) diff --git a/baselines/fedpara/_static/Cifar100_iid.jpeg b/baselines/fedpara/_static/Cifar100_iid.jpeg new file mode 100644 index 000000000000..c8b94f94670d Binary files /dev/null and b/baselines/fedpara/_static/Cifar100_iid.jpeg differ diff --git a/baselines/fedpara/_static/Cifar100_noniid.jpeg b/baselines/fedpara/_static/Cifar100_noniid.jpeg new file mode 100644 index 000000000000..f76de3c46b02 Binary files /dev/null and b/baselines/fedpara/_static/Cifar100_noniid.jpeg differ diff --git a/baselines/fedpara/_static/Cifar10_iid.jpeg b/baselines/fedpara/_static/Cifar10_iid.jpeg new file mode 100644 index 000000000000..bc12f28ea4da Binary files /dev/null and b/baselines/fedpara/_static/Cifar10_iid.jpeg differ diff --git a/baselines/fedpara/_static/Cifar10_noniid.jpeg b/baselines/fedpara/_static/Cifar10_noniid.jpeg new file mode 100644 index 000000000000..2f510f047318 Binary files /dev/null and b/baselines/fedpara/_static/Cifar10_noniid.jpeg differ diff --git a/baselines/fedpara/_static/non-iid_mnist_personalization.png b/baselines/fedpara/_static/non-iid_mnist_personalization.png new file mode 100644 index 000000000000..4b09de2c2836 Binary files /dev/null and b/baselines/fedpara/_static/non-iid_mnist_personalization.png differ diff --git a/baselines/fedpara/fedpara/__init__.py b/baselines/fedpara/fedpara/__init__.py new file mode 100644 index 000000000000..a5e567b59135 --- /dev/null +++ b/baselines/fedpara/fedpara/__init__.py @@ -0,0 +1 @@ +"""Template baseline package.""" diff --git a/baselines/fedpara/fedpara/client.py b/baselines/fedpara/fedpara/client.py new file mode 100644 index 000000000000..a435e82c6bf8 --- /dev/null +++ b/baselines/fedpara/fedpara/client.py @@ -0,0 +1,197 @@ +"""Client for FedPara.""" + +import copy +import os +from collections import OrderedDict +from typing import Callable, Dict, List, Optional, Tuple + +import flwr as fl +import torch +from flwr.common import NDArrays, Scalar +from hydra.utils import instantiate +from omegaconf import DictConfig +from torch.utils.data import DataLoader + +from fedpara.models import test, train +from fedpara.utils import get_keys_state_dict + + +class FlowerClient(fl.client.NumPyClient): + """Standard Flower client for CNN training.""" + + def __init__( + self, + cid: int, + net: torch.nn.Module, + train_loader: DataLoader, + device: str, + num_epochs: int, + ): # pylint: disable=too-many-arguments + self.cid = cid + self.net = net + self.train_loader = train_loader + self.device = torch.device(device) + self.num_epochs = num_epochs + + def get_parameters(self, config: Dict[str, Scalar]) -> NDArrays: + """Return the parameters of the current net.""" + return [val.cpu().numpy() for _, val in self.net.state_dict().items()] + + def set_parameters(self, parameters: NDArrays) -> None: + """Apply parameters to model state dict.""" + params_dict = zip(self.net.state_dict().keys(), parameters) + state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict}) + self.net.load_state_dict(state_dict, strict=True) + + def fit( + self, parameters: NDArrays, config: Dict[str, Scalar] + ) -> Tuple[NDArrays, int, Dict]: + """Train the network on the training set.""" + self.set_parameters(parameters) + + train( + self.net, + self.train_loader, + self.device, + epochs=self.num_epochs, + hyperparams=config, + epoch=int(config["curr_round"]), + ) + + return ( + self.get_parameters({}), + len(self.train_loader), + {}, + ) + + +# pylint: disable=too-many-instance-attributes +class PFlowerClient(fl.client.NumPyClient): + """Personalized Flower Client.""" + + def __init__( # pylint: disable=too-many-arguments + self, + cid: int, + net: torch.nn.Module, + train_loader: DataLoader, + test_loader: DataLoader, + device: str, + num_epochs: int, + state_path: str, + algorithm: str, + ): + self.cid = cid + self.net = net + self.train_loader = train_loader + self.test_loader = test_loader + self.device = torch.device(device) + self.num_epochs = num_epochs + self.state_path = state_path + self.algorithm = algorithm + self.private_server_param: Dict[str, torch.Tensor] = {} + + def get_parameters(self, config: Dict[str, Scalar]) -> NDArrays: + """Return the parameters of the current net.""" + model_dict = self.net.state_dict().copy() + # overwrite the server private parameters + for k in self.private_server_param.keys(): + model_dict[k] = self.private_server_param[k] + return [val.cpu().numpy() for _, val in model_dict.items()] + + def set_parameters(self, parameters: NDArrays, evaluate: bool) -> None: + """Apply parameters to model state dict.""" + params_dict = zip(self.net.state_dict().keys(), parameters) + server_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict}) + self.private_server_param = { + k: server_dict[k] + for k in get_keys_state_dict( + model=self.net, algorithm=self.algorithm, mode="local" + ) + } + + if evaluate: + client_dict = self.net.state_dict().copy() + else: + client_dict = copy.deepcopy(server_dict) + + if os.path.isfile(self.state_path): + with open(self.state_path, "rb") as f: + client_dict = torch.load(f) + + for k in get_keys_state_dict( + model=self.net, algorithm=self.algorithm, mode="global" + ): + client_dict[k] = server_dict[k] + + self.net.load_state_dict(client_dict, strict=False) + + def fit( + self, parameters: NDArrays, config: Dict[str, Scalar] + ) -> Tuple[NDArrays, int, Dict]: + """Train the network on the training set.""" + self.set_parameters(parameters, evaluate=False) + + train( + self.net, + self.train_loader, + self.device, + epochs=self.num_epochs, + hyperparams=config, + epoch=int(config["curr_round"]), + ) + if self.state_path is not None: + with open(self.state_path, "wb") as f: + torch.save(self.net.state_dict(), f) + + return ( + self.get_parameters({}), + len(self.train_loader), + {}, + ) + + def evaluate( + self, parameters: NDArrays, config: Dict[str, Scalar] + ) -> Tuple[float, int, Dict]: + """Evaluate the network on the test set.""" + self.set_parameters(parameters, evaluate=True) + self.net.to(self.device) + loss, accuracy = test(self.net, self.test_loader, device=self.device) + return loss, len(self.test_loader), {"accuracy": accuracy} + + +# pylint: disable=too-many-arguments +def gen_client_fn( + train_loaders: List[DataLoader], + model: DictConfig, + num_epochs: int, + args: Dict, + test_loader: Optional[List[DataLoader]] = None, + state_path: Optional[str] = None, +) -> Callable[[str], fl.client.NumPyClient]: + """Return a function which creates a new FlowerClient for a given cid.""" + + def client_fn(cid: str) -> fl.client.NumPyClient: + """Create a new FlowerClient for a given cid.""" + cid_ = int(cid) + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + if args["algorithm"].lower() == "pfedpara" or args["algorithm"] == "fedper": + cl_path = f"{state_path}/client_{cid_}.pth" + return PFlowerClient( + cid=cid_, + net=instantiate(model).to(device), + train_loader=train_loaders[cid_], + test_loader=copy.deepcopy(test_loader), + num_epochs=num_epochs, + state_path=cl_path, + algorithm=args["algorithm"].lower(), + device=device, + ) + return FlowerClient( + cid=cid_, + net=instantiate(model).to(device), + train_loader=train_loaders[cid_], + num_epochs=num_epochs, + device=device, + ) + + return client_fn diff --git a/baselines/fedpara/fedpara/conf/cifar10.yaml b/baselines/fedpara/fedpara/conf/cifar10.yaml new file mode 100644 index 000000000000..97ec2520c3c1 --- /dev/null +++ b/baselines/fedpara/fedpara/conf/cifar10.yaml @@ -0,0 +1,43 @@ +--- +seed: 121 + +num_clients: 100 +num_rounds: 200 +clients_per_round: 16 +num_epochs: 5 +batch_size: 64 +algorithm: FedPara + +server_device: cuda + +client_resources: + num_cpus: 2 + num_gpus: 0.125 + +dataset_config: + name: CIFAR10 + partition: non-iid + num_classes: 10 + alpha: 0.5 + +model: + _target_: fedpara.models.VGG + num_classes: ${dataset_config.num_classes} + param_type: lowrank # lowrank or standard + ratio: 0.1 # lowrank ratio + +hyperparams: + eta_l: 0.1 + learning_decay: 0.992 + + +strategy: + _target_: flwr.server.strategy.FedAvg + fraction_fit: 0.00001 + fraction_evaluate: 0.0 + min_evaluate_clients: 0 + min_fit_clients: ${clients_per_round} + min_available_clients: ${clients_per_round} + accept_failures: false + +exp_id: ${model.param_type}_${dataset_config.name}_${dataset_config.partition}_alpha${dataset_config.alpha} diff --git a/baselines/fedpara/fedpara/conf/cifar100.yaml b/baselines/fedpara/fedpara/conf/cifar100.yaml new file mode 100644 index 000000000000..d1023014605b --- /dev/null +++ b/baselines/fedpara/fedpara/conf/cifar100.yaml @@ -0,0 +1,42 @@ +--- +seed: 342130 + +num_clients: 50 +num_rounds: 400 +clients_per_round: 8 +num_epochs: 5 +batch_size: 64 +algorithm: FedPara + +server_device: cuda + +client_resources: + num_cpus: 2 + num_gpus: 0.125 + +dataset_config: + name: CIFAR100 + partition: non-iid + num_classes: 100 + alpha: 0.5 + +model: + _target_: fedpara.models.VGG + num_classes: ${dataset_config.num_classes} + param_type: lowrank # lowrank or standard + ratio: 0.4 # lowrank ratio + +hyperparams: + eta_l: 0.1 + learning_decay: 0.992 + +strategy: + _target_: flwr.server.strategy.FedAvg + fraction_fit: 0.00001 + fraction_evaluate: 0.0 + min_evaluate_clients: 0 + min_fit_clients: ${clients_per_round} + min_available_clients: ${clients_per_round} + accept_failures: false + +exp_id: ${model.param_type}_${dataset_config.name}_${dataset_config.partition}_alpha${dataset_config.alpha} \ No newline at end of file diff --git a/baselines/fedpara/fedpara/conf/mnist_fedavg.yaml b/baselines/fedpara/fedpara/conf/mnist_fedavg.yaml new file mode 100644 index 000000000000..d0cda9a62e76 --- /dev/null +++ b/baselines/fedpara/fedpara/conf/mnist_fedavg.yaml @@ -0,0 +1,38 @@ +--- +seed: 424 + +num_clients: 100 +num_rounds: 100 +clients_per_round: 10 +num_epochs: 1 +batch_size: 10 +server_device: cuda +algorithm: fedavg + +client_resources: + num_cpus: 2 + num_gpus: 0.1 + +dataset_config: + name: MNIST + num_classes: 10 + shard_size: 300 + +model: + _target_: fedpara.models.FC + num_classes: ${dataset_config.num_classes} + hidden_size: 200 + +hyperparams: + eta_l: 0.05 + learning_decay: 1 + +strategy: + _target_: flwr.server.strategy.FedAvg + fraction_fit: 0.00001 + fraction_evaluate: 0 + min_evaluate_clients: 0 + min_fit_clients: ${clients_per_round} + min_available_clients: ${clients_per_round} + +exp_id: ${algorithm}_${dataset_config.name} diff --git a/baselines/fedpara/fedpara/conf/mnist_fedper.yaml b/baselines/fedpara/fedpara/conf/mnist_fedper.yaml new file mode 100644 index 000000000000..dd9034284459 --- /dev/null +++ b/baselines/fedpara/fedpara/conf/mnist_fedper.yaml @@ -0,0 +1,45 @@ +--- +seed: 17 + +num_clients: 100 +num_rounds: 100 +clients_per_round: 10 +num_epochs: 1 +batch_size: 10 +state_path: ./client_states/ +server_device: cuda +client_device: cuda + +algorithm: fedper +# fedavg in future + +client_resources: + num_cpus: 2 + num_gpus: 0.1 + +dataset_config: + name: MNIST + num_classes: 10 + shard_size: 300 + + data_seed: ${seed} + +model: + _target_: fedpara.models.FC + num_classes: ${dataset_config.num_classes} + hidden_size: 200 + +hyperparams: + eta_l: 0.05 + learning_decay: 0 + +strategy: + _target_: flwr.server.strategy.FedAvg + fraction_fit: 0.00001 + fraction_evaluate: 0.00001 + min_evaluate_clients: ${clients_per_round} + min_fit_clients: ${clients_per_round} + min_available_clients: ${clients_per_round} + + +exp_id: ${algorithm}_${dataset_config.name} diff --git a/baselines/fedpara/fedpara/conf/mnist_pfedpara.yaml b/baselines/fedpara/fedpara/conf/mnist_pfedpara.yaml new file mode 100644 index 000000000000..5733397def0f --- /dev/null +++ b/baselines/fedpara/fedpara/conf/mnist_pfedpara.yaml @@ -0,0 +1,41 @@ +--- +seed: 17 + +num_clients: 100 +num_rounds: 100 +clients_per_round: 10 +num_epochs: 5 +batch_size: 10 +state_path: ./client_states/ +server_device: cuda +algorithm: pFedpara + +client_resources: + num_cpus: 2 + num_gpus: 0.1 + +dataset_config: + name: MNIST + num_classes: 10 + shard_size: 300 + +model: + _target_: fedpara.models.FC + num_classes: ${dataset_config.num_classes} + param_type: lowrank # lowrank or standard + ratio: 0.5 # lowrank ratio + hidden_size: 200 + +hyperparams: + eta_l: 0.01 + learning_decay: 0.999 + +strategy: + _target_: flwr.server.strategy.FedAvg + fraction_fit: 0.00001 + fraction_evaluate: 0.00001 + min_evaluate_clients: ${clients_per_round} + min_fit_clients: ${clients_per_round} + min_available_clients: ${clients_per_round} + +exp_id: ${algorithm}_${dataset_config.name}_${model.param_type}_${model.ratio} diff --git a/baselines/fedpara/fedpara/dataset.py b/baselines/fedpara/fedpara/dataset.py new file mode 100644 index 000000000000..c76a75d982f6 --- /dev/null +++ b/baselines/fedpara/fedpara/dataset.py @@ -0,0 +1,139 @@ +"""Dataset loading and processing utilities.""" + +import pickle +from typing import List, Tuple + +from torch.utils.data import DataLoader +from torchvision import datasets, transforms + +from fedpara.dataset_preparation import ( + DatasetSplit, + iid, + noniid, + noniid_partition_loader, +) + + +def load_datasets( + config, num_clients, batch_size +) -> Tuple[List[DataLoader], DataLoader]: + """Load the dataset and return the dataloaders for the clients and the server.""" + print("Loading data...") + match config["name"]: + case "CIFAR10": + Dataset = datasets.CIFAR10 + case "CIFAR100": + Dataset = datasets.CIFAR100 + case "MNIST": + Dataset = datasets.MNIST + case _: + raise NotImplementedError + data_directory = f"./data/{config['name'].lower()}/" + match config["name"]: + case "CIFAR10" | "CIFAR100": + ds_path = f"{data_directory}train_{num_clients}_{config.alpha:.2f}.pkl" + transform_train = transforms.Compose( + [ + transforms.RandomCrop(32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize( + (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010) + ), + ] + ) + transform_test = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize( + (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010) + ), + ] + ) + try: + with open(ds_path, "rb") as file: + train_datasets = pickle.load(file).values() + dataset_train = Dataset( + data_directory, + train=True, + download=False, + transform=transform_train, + ) + dataset_test = Dataset( + data_directory, + train=False, + download=False, + transform=transform_test, + ) + except FileNotFoundError: + dataset_train = Dataset( + data_directory, train=True, download=True, transform=transform_train + ) + if config.partition == "iid": + train_datasets = iid(dataset_train, num_clients) + else: + train_datasets, _ = noniid(dataset_train, num_clients, config.alpha) + pickle.dump(train_datasets, open(ds_path, "wb")) + train_datasets = train_datasets.values() + dataset_test = Dataset( + data_directory, train=False, download=True, transform=transform_test + ) + + case "MNIST": + ds_path = f"{data_directory}train_{num_clients}.pkl" + transform_train = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)), + ] + ) + transform_test = transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] + ) + try: + train_datasets = pickle.load(open(ds_path, "rb")) + dataset_train = Dataset( + data_directory, + train=True, + download=False, + transform=transform_train, + ) + dataset_test = Dataset( + data_directory, + train=False, + download=False, + transform=transform_test, + ) + + except FileNotFoundError: + dataset_train = Dataset( + data_directory, train=True, download=True, transform=transform_train + ) + train_datasets = noniid_partition_loader( + dataset_train, + m_per_shard=config.shard_size, + n_shards_per_client=len(dataset_train) // (config.shard_size * 100), + ) + pickle.dump(train_datasets, open(ds_path, "wb")) + dataset_test = Dataset( + data_directory, train=False, download=True, transform=transform_test + ) + train_loaders = [ + DataLoader(x, batch_size=batch_size, shuffle=True) + for x in train_datasets + ] + test_loader = DataLoader(dataset_test, batch_size=batch_size, num_workers=2) + return train_loaders, test_loader + + test_loader = DataLoader(dataset_test, batch_size=batch_size, num_workers=2) + train_loaders = [ + DataLoader( + DatasetSplit(dataset_train, ids), + batch_size=batch_size, + shuffle=True, + num_workers=2, + ) + for ids in train_datasets + ] + + return train_loaders, test_loader diff --git a/baselines/fedpara/fedpara/dataset_preparation.py b/baselines/fedpara/fedpara/dataset_preparation.py new file mode 100644 index 000000000000..2f3922e97cef --- /dev/null +++ b/baselines/fedpara/fedpara/dataset_preparation.py @@ -0,0 +1,160 @@ +"""Handle the dataset partitioning and (optionally) complex downloads. + +Please add here all the necessary logic to either download, uncompress, pre/post-process +your dataset (or all of the above). If the desired way of running your baseline is to +first download the dataset and partition it and then run the experiments, please +uncomment the lines below and tell us in the README.md (see the "Running the Experiment" +block) that this file should be executed first. +""" + +import random +from collections import defaultdict + +import numpy as np +import torch +from torch.utils.data import Dataset + + +class DatasetSplit(Dataset): + """An abstract Dataset class wrapped around Pytorch Dataset class.""" + + def __init__(self, dataset, idxs): + self.dataset = dataset + self.targets = dataset.targets + self.idxs = list(idxs) + + def __len__(self): + """Return number of images.""" + return len(self.idxs) + + def __getitem__(self, item): + """Return a transformed example of the dataset.""" + image, label = self.dataset[self.idxs[item]] + return image, label + + +def iid(dataset, num_users): + """Sample I.I.D. clients data from a dataset. + + Args: + dataset: dataset object + num_users: number of users + Returns: + dict of image index + """ + num_items = int(len(dataset) / num_users) + dict_users, all_idxs = {}, list(range(len(dataset))) + for i in range(num_users): + dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False)) + all_idxs = list(set(all_idxs) - dict_users[i]) + return dict_users + + +# pylint: disable=too-many-locals +def noniid(dataset, no_participants, alpha=0.5): + """Sample non-I.I.D client data from dataset. + + Args: + dataset: dataset object + no_participants: number of users + alpha: float parameter for dirichlet distribution + Returns: + dict of image index + Requires: + cifar_classes, a preprocessed class-indice dictionary. + Sample Method: take a uniformly sampled 10/100-dimension vector + as parameters for dirichlet distribution to sample number of + images in each class. + """ + np.random.seed(666) + random.seed(666) + cifar_classes = {} + for ind, x in enumerate(dataset): + _, label = x + if label in cifar_classes: + cifar_classes[label].append(ind) + else: + cifar_classes[label] = [ind] + + per_participant_list = defaultdict(list) + no_classes = len(cifar_classes.keys()) + class_size = len(cifar_classes[0]) + datasize = {} + for n in range(no_classes): + random.shuffle(cifar_classes[n]) + sampled_probabilities = class_size * np.random.dirichlet( + np.array(no_participants * [alpha]) + ) + for user in range(no_participants): + no_imgs = int(round(sampled_probabilities[user])) + datasize[user, n] = no_imgs + sampled_list = cifar_classes[n][: min(len(cifar_classes[n]), no_imgs)] + per_participant_list[user].extend(sampled_list) + cifar_classes[n] = cifar_classes[n][min(len(cifar_classes[n]), no_imgs) :] + train_img_size = np.zeros(no_participants) + for i in range(no_participants): + train_img_size[i] = sum([datasize[i, j] for j in range(no_classes)]) + clas_weight = np.zeros((no_participants, no_classes)) + for i in range(no_participants): + for j in range(no_classes): + clas_weight[i, j] = float(datasize[i, j]) / float((train_img_size[i])) + return per_participant_list, clas_weight + + +def data_to_tensor(data): + """Load dataset to memory, applies transform.""" + loader = torch.utils.data.DataLoader(data, batch_size=len(data)) + img, label = next(iter(loader)) + return img, label + + +def noniid_partition_loader(data, m_per_shard=300, n_shards_per_client=2): + """Partition in semi-pathological fashion. + + 1. sort examples by label, form shards of size 300 by grouping points + successively + 2. each client is 2 random shards + most clients will have 2 digits, at most 4. + """ + # load data into memory + img, label = data_to_tensor(data) + + # sort + idx = torch.argsort(label) + img = img[idx] + label = label[idx] + + # split into n_shards of size m_per_shard + m = len(data) + assert m % m_per_shard == 0 + n_shards = m // m_per_shard + shards_idx = [ + torch.arange(m_per_shard * i, m_per_shard * (i + 1)) for i in range(n_shards) + ] + random.shuffle(shards_idx) # shuffle shards + + # pick shards to create a dataset for each client + assert n_shards % n_shards_per_client == 0 + n_clients = n_shards // n_shards_per_client + client_data = [ + torch.utils.data.TensorDataset( + torch.cat( + [ + img[shards_idx[j]] + for j in range( + i * n_shards_per_client, (i + 1) * n_shards_per_client + ) + ] + ), + torch.cat( + [ + label[shards_idx[j]] + for j in range( + i * n_shards_per_client, (i + 1) * n_shards_per_client + ) + ] + ), + ) + for i in range(n_clients) + ] + return client_data diff --git a/baselines/fedpara/fedpara/main.py b/baselines/fedpara/fedpara/main.py new file mode 100644 index 000000000000..2397a20e17ef --- /dev/null +++ b/baselines/fedpara/fedpara/main.py @@ -0,0 +1,124 @@ +"""Main script for running FedPara.""" + +from typing import Dict + +import flwr as fl +import hydra +from flwr.common import Scalar +from hydra.core.hydra_config import HydraConfig +from hydra.utils import instantiate +from omegaconf import DictConfig, OmegaConf + +from fedpara import client, server, utils +from fedpara.dataset import load_datasets +from fedpara.server import weighted_average +from fedpara.utils import ( + get_parameters, + save_results_as_pickle, + seed_everything, + set_client_state_save_path, +) + + +@hydra.main(config_path="conf", config_name="cifar10", version_base=None) +def main(cfg: DictConfig) -> None: + """Run the baseline. + + Parameters + ---------- + cfg : DictConfig + An omegaconf object that stores the hydra config. + """ + # 1. Print parsed config + print(OmegaConf.to_yaml(cfg)) + seed_everything(cfg.seed) + OmegaConf.to_container(cfg, resolve=True) + if "state_path" in cfg: + state_path = set_client_state_save_path(cfg.state_path) + else: + state_path = None + + # 2. Prepare dataset + train_loaders, test_loader = load_datasets( + config=cfg.dataset_config, + num_clients=cfg.num_clients, + batch_size=cfg.batch_size, + ) + + # 3. Define clients + # In this scheme the responsability of choosing the client is on the client manager + + client_fn = client.gen_client_fn( + train_loaders=train_loaders, + test_loader=test_loader, + model=cfg.model, + num_epochs=cfg.num_epochs, + args={"algorithm": cfg.algorithm}, + state_path=state_path, + ) + + def get_on_fit_config(): + def fit_config_fn(server_round: int): + fit_config: Dict[str, Scalar] = OmegaConf.to_container( + cfg.hyperparams, resolve=True + ) # type: ignore + fit_config["curr_round"] = server_round + return fit_config + + return fit_config_fn + + net_glob = instantiate(cfg.model) + + # 4. Define strategy + if cfg.strategy.min_evaluate_clients: + strategy = instantiate( + cfg.strategy, + on_fit_config_fn=get_on_fit_config(), + initial_parameters=fl.common.ndarrays_to_parameters( + get_parameters(net_glob) + ), + evaluate_metrics_aggregation_fn=weighted_average, + ) + + else: + evaluate_fn = server.gen_evaluate_fn( + num_clients=cfg.num_clients, + test_loader=test_loader, + model=cfg.model, + device=cfg.server_device, + ) + strategy = instantiate( + cfg.strategy, + evaluate_fn=evaluate_fn, + on_fit_config_fn=get_on_fit_config(), + initial_parameters=fl.common.ndarrays_to_parameters( + get_parameters(net_glob) + ), + ) + + # 5. Start Simulation + history = fl.simulation.start_simulation( + client_fn=client_fn, + num_clients=cfg.num_clients, + config=fl.server.ServerConfig(num_rounds=cfg.num_rounds), + strategy=strategy, + client_resources=cfg.client_resources, + ) + save_path = HydraConfig.get().runtime.output_dir + + save_results_as_pickle(history, file_path=save_path) + + # 6. Save results + file_suffix = "_".join([(net_glob).__class__.__name__, f"{cfg.exp_id}"]) + + utils.plot_metric_from_history( + hist=history, + save_plot_path=save_path, + suffix=file_suffix, + cfg=cfg, + model_size=net_glob.model_size[1], + ) + + +if __name__ == "__main__": + main() diff --git a/baselines/fedpara/fedpara/models.py b/baselines/fedpara/fedpara/models.py new file mode 100644 index 000000000000..370720ff58ce --- /dev/null +++ b/baselines/fedpara/fedpara/models.py @@ -0,0 +1,478 @@ +"""Model definitions for FedPara.""" + +import math +from typing import Dict, Tuple + +import numpy as np +import torch +import torch.nn.functional as F +from flwr.common import Scalar +from torch import nn +from torch.nn import init +from torch.utils.data import DataLoader + + +class LowRankNN(nn.Module): + """Fedpara Low-rank weight systhesis for fully connected layer.""" + + def __init__(self, input_, output, rank) -> None: + super().__init__() + + self.X = nn.Parameter( + torch.empty(size=(input_, rank)), + requires_grad=True, + ) + self.Y = nn.Parameter(torch.empty(size=(output, rank)), requires_grad=True) + + init.kaiming_normal_(self.X, mode="fan_out", nonlinearity="relu") + init.kaiming_normal_(self.Y, mode="fan_out", nonlinearity="relu") + + def forward(self): + """Forward pass.""" + out = torch.einsum("yr,xr->yx", self.Y, self.X) + return out + + +class Linear(nn.Module): + """Low-rank fully connected layer module for personalized scheme.""" + + def __init__(self, input_, output, ratio, bias=True) -> None: + super().__init__() + rank = self._calc_from_ratio(ratio, input_, output) + self.w1 = LowRankNN(input_, output, rank) + self.w2 = LowRankNN(input_, output, rank) + # make the bias for each layer + if bias: + self.bias = nn.Parameter(torch.zeros(output)) + + @staticmethod + def _calc_from_ratio(ratio, input_, output): + # Return the low-rank of sub-matrices given the compression ratio + # minimum possible parameter + r1 = int(np.ceil(np.sqrt(output))) + r2 = int(np.ceil(np.sqrt(input_))) + r = np.min((r1, r2)) + # maximum possible rank + # To solve it we need to know the roots of quadratic equation: 2*r*(m+n)=m*n + r3 = math.floor((output * input_) / (2 * (output + input_))) + rank = math.ceil((1 - ratio) * r + ratio * r3) + return rank + + def forward(self, x): + """Forward pass.""" + # personalized + w = self.w1() * self.w2() + self.w1() + out = F.linear(x, w, self.bias) + return out + + +class FC(nn.Module): + """2NN Fully connected layer as in the paper: https://arxiv.org/abs/1602.05629.""" + + def __init__( # pylint: disable=too-many-arguments + self, + input_size=28**2, + hidden_size=200, + num_classes=10, + ratio=0.5, + param_type="standard", + ): + super().__init__() + self.input_size = input_size + if param_type == "standard": + self.fc1 = nn.Linear(input_size, hidden_size) + self.fc2 = nn.Linear(hidden_size, 256) + self.out = nn.Linear(256, num_classes) + + elif param_type == "lowrank": + self.fc1 = Linear(input_size, hidden_size, ratio) + self.fc2 = Linear(hidden_size, 256, ratio) + self.out = Linear(256, num_classes, ratio) + + else: + raise ValueError("param_type must be either standard or lowrank") + + @property + def model_size(self): + """Return the total number of trainable parameters (in million paramaters) and. + + the size of the model in MB. + """ + total_trainable_params = ( + sum(p.numel() for p in self.parameters() if p.requires_grad) / 1e6 + ) + param_size = 0 + for param in self.parameters(): + param_size += param.nelement() * param.element_size() + buffer_size = 0 + for buffer in self.buffers(): + buffer_size += buffer.nelement() * buffer.element_size() + size_all_mb = (param_size + buffer_size) / 1024**2 + return total_trainable_params, size_all_mb + + def forward(self, x): + """Forward pass.""" + x = x.view(-1, self.input_size) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.out(x) + return x + + +class LowRank(nn.Module): + """Fedpara Low-rank weight systhesis for Convolution layer.""" + + def __init__( # pylint: disable=too-many-arguments + self, + in_channels: int, + out_channels: int, + low_rank: int, + kernel_size: int, + ): + super().__init__() + self.T = nn.Parameter( + torch.empty(size=(low_rank, low_rank, kernel_size, kernel_size)), + requires_grad=True, + ) + self.X = nn.Parameter( + torch.empty(size=(low_rank, out_channels)), requires_grad=True + ) + self.Y = nn.Parameter( + torch.empty(size=(low_rank, in_channels)), requires_grad=True + ) + init.kaiming_normal_(self.T, mode="fan_out", nonlinearity="relu") + init.kaiming_normal_(self.X, mode="fan_out", nonlinearity="relu") + init.kaiming_normal_(self.Y, mode="fan_out", nonlinearity="relu") + + def forward(self): + """Forward pass.""" + # torch.einsum simplify the tensor produce (matrix multiplication) + return torch.einsum("xyzw,xo,yi->oizw", self.T, self.X, self.Y) + + +# pylint: disable=too-many-instance-attributes +class Conv2d(nn.Module): + """Convolutional layer with low-rank weights.""" + + def __init__( # pylint: disable=too-many-arguments + self, + in_channels: int, + out_channels: int, + kernel_size: int = 3, + stride: int = 1, + padding: int = 0, + bias: bool = False, + ratio: float = 0.1, + add_nonlinear: bool = False, + ): + super().__init__() + self.in_channels = in_channels + self.out_channels = out_channels + self.kernel_size = kernel_size + self.stride = stride + self.padding = padding + self.bias = bias + self.ratio = ratio + self.low_rank = self._calc_from_ratio() + self.add_nonlinear = add_nonlinear + self.W1 = LowRank(in_channels, out_channels, self.low_rank, kernel_size) + self.W2 = LowRank(in_channels, out_channels, self.low_rank, kernel_size) + self.bias = nn.Parameter(torch.zeros(out_channels)) if bias else None + self.tanh = nn.Tanh() + + def _calc_from_ratio(self): + # Return the low-rank of sub-matrices given the compression ratio + + # minimum possible parameter + r1 = int(np.ceil(np.sqrt(self.out_channels))) + r2 = int(np.ceil(np.sqrt(self.in_channels))) + r = np.min((r1, r2)) + + # maximum possible rank, + # To solve it we need to know the roots of quadratic equation: ax^2+bx+c=0 + # a = kernel**2 + # b = out channel+ in channel + # c = - num_target_params/2 + # r3 is floored because we cannot take the ceil as it results a bigger number + # of parameters than the original problem + + num_target_params = ( + self.out_channels * self.in_channels * (self.kernel_size**2) + ) + a, b, c = ( + self.kernel_size**2, + self.out_channels + self.in_channels, + -num_target_params / 2, + ) + discriminant = b**2 - 4 * a * c + r3 = math.floor((-b + math.sqrt(discriminant)) / (2 * a)) + ratio = math.ceil((1 - self.ratio) * r + self.ratio * r3) + return ratio + + def forward(self, x): + """Forward pass.""" + # Hadamard product of two submatrices + if self.add_nonlinear: + W = self.tanh(self.W1()) * self.tanh(self.W2()) + else: + W = self.W1() * self.W2() + out = F.conv2d( + input=x, weight=W, bias=self.bias, stride=self.stride, padding=self.padding + ) + return out + + +# pylint: disable=too-many-instance-attributes +class VGG(nn.Module): + """VGG16GN model.""" + + def __init__( # pylint: disable=too-many-arguments + self, + num_classes, + num_groups=2, + ratio=0.1, + param_type="lowrank", + add_nonlinear=False, + ): + super().__init__() + self.param_type = param_type + self.num_groups = num_groups + self.num_classes = num_classes + self.ratio = ratio + self.add_nonlinear = add_nonlinear + self.features = self._make_layers( + [ + 64, + 64, + "M", + 128, + 128, + "M", + 256, + 256, + 256, + "M", + 512, + 512, + 512, + "M", + 512, + 512, + 512, + "M", + ] + ) + self.classifier = nn.Sequential( + nn.Dropout(), + nn.Linear(512, 512), + nn.ReLU(inplace=True), + nn.Dropout(), + nn.Linear(512, 512), + nn.ReLU(inplace=True), + nn.Linear(512, num_classes), + ) + self._init_weights() + + def _init_weights(self): + """Initialize the weights.""" + for name, module in self.features.named_children(): + module = getattr(self.features, name) + if isinstance(module, nn.Conv2d): + if self.param_type == "lowrank": + num_channels = module.in_channels + setattr( + self.features, + name, + Conv2d( + num_channels, + module.out_channels, + module.kernel_size[0], + module.stride[0], + module.padding[0], + module.bias is not None, + ratio=self.ratio, + add_nonlinear=self.add_nonlinear, + # send the activation function to the Conv2d class + ), + ) + elif self.param_type == "standard": + n = ( + module.kernel_size[0] + * module.kernel_size[1] + * module.out_channels + ) + module.weight.data.normal_(0, math.sqrt(2.0 / n)) + module.bias.data.zero_() + + def _make_layers(self, cfg, group_norm=True): + layers = [] + in_channels = 3 + for v in cfg: + if v == "M": + layers += [nn.MaxPool2d(kernel_size=2, stride=2)] + else: + conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1) + if group_norm: + layers += [ + conv2d, + nn.GroupNorm(self.num_groups, v), + nn.ReLU(inplace=True), + ] + else: + layers += [conv2d, nn.ReLU(inplace=True)] + in_channels = v + return nn.Sequential(*layers) + + @property + def model_size(self): + """Return the total number of trainable parameters (in million paramaters) and. + + the size of the model in MB. + """ + total_trainable_params = ( + sum(p.numel() for p in self.parameters() if p.requires_grad) / 1e6 + ) + param_size = 0 + for param in self.parameters(): + param_size += param.nelement() * param.element_size() + buffer_size = 0 + for buffer in self.buffers(): + buffer_size += buffer.nelement() * buffer.element_size() + size_all_mb = (param_size + buffer_size) / 1024**2 + return total_trainable_params, size_all_mb + + def forward(self, x): + """Forward pass.""" + x = self.features(x) + x = x.view(x.size(0), -1) + x = self.classifier(x) + return x + + +# Create an instance of the VGG16GN model with Group Normalization, +# custom Conv2d, and modified classifier +def test( + net: nn.Module, test_loader: DataLoader, device: torch.device +) -> Tuple[float, float]: + """Evaluate the network on the entire test set. + + Parameters + ---------- + net : nn.Module + The neural network to test. + test_loader : DataLoader + The DataLoader containing the data to test the network on. + device : torch.device + The device on which the model should be tested, either 'cpu' or 'cuda'. + + Returns + ------- + Tuple[float, float] + The loss and the accuracy of the input model on the given data. + """ + if len(test_loader.dataset) == 0: + raise ValueError("Testloader can't be 0, exiting...") + + criterion = torch.nn.CrossEntropyLoss() + correct, total, loss = 0, 0, 0.0 + net.eval() + with torch.no_grad(): + for images, labels in test_loader: + images, labels = images.to(device), labels.to(device) + outputs = net(images) + loss += criterion(outputs, labels).item() + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + loss /= len(test_loader.dataset) + accuracy = correct / total + return loss, accuracy + + +def train( # pylint: disable=too-many-arguments + net: nn.Module, + trainloader: DataLoader, + device: torch.device, + epochs: int, + hyperparams: Dict[str, Scalar], + epoch: int, +) -> None: + """Train the network on the training set. + + Parameters + ---------- + net : nn.Module + The neural network to train. + trainloader : DataLoader + The DataLoader containing the data to train the network on. + device : torch.device + The device on which the model should be trained, either 'cpu' or 'cuda'. + epochs : int + The number of epochs the model should be trained for. + hyperparams : Dict[str, Scalar] + The hyperparameters to use for training. + """ + lr = float(hyperparams["eta_l"]) * float(hyperparams["learning_decay"]) ** ( + epoch - 1 + ) + criterion = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.SGD( + net.parameters(), + lr=lr, + momentum=0, + weight_decay=0, + ) + net.train() + for _ in range(epochs): + net = _train_one_epoch( + net=net, + trainloader=trainloader, + device=device, + criterion=criterion, + optimizer=optimizer, + ) + + +def _train_one_epoch( # pylint: disable=too-many-arguments + net: nn.Module, + trainloader: DataLoader, + device: torch.device, + criterion, + optimizer, +) -> nn.Module: + """Train for one epoch. + + Parameters + ---------- + net : nn.Module + The neural network to train. + trainloader : DataLoader + The DataLoader containing the data to train the network on. + device : torch.device + The device on which the model should be trained, either 'cpu' or 'cuda'. + criterion : + The loss function to use for training + optimizer : + The optimizer to use for training + hyperparams : Dict[str, Scalar] + The hyperparameters to use for training. + + Returns + ------- + nn.Module + The model that has been trained for one epoch. + """ + for images, labels in trainloader: + images, labels = images.to(device), labels.to(device) + net.zero_grad() + log_probs = net(images) + loss = criterion(log_probs, labels) + loss.backward() + optimizer.step() + return net + + +if __name__ == "__main__": + model = VGG(num_classes=10, num_groups=2, param_type="standard", ratio=0.4) + # Print the modified VGG16GN model architecture + print(model.model_size) diff --git a/baselines/fedpara/fedpara/server.py b/baselines/fedpara/fedpara/server.py new file mode 100644 index 000000000000..036efa090c52 --- /dev/null +++ b/baselines/fedpara/fedpara/server.py @@ -0,0 +1,77 @@ +"""Global evaluation function.""" + +from collections import OrderedDict +from typing import Callable, Dict, List, Optional, Tuple + +import torch +from flwr.common import Metrics, NDArrays, Scalar +from hydra.utils import instantiate +from omegaconf import DictConfig +from torch.utils.data import DataLoader + +from fedpara.models import test + + +def get_on_fit_config(hypearparams: Dict): + """Generate fit config function.""" + + def fit_config_fn(server_round: int): + hypearparams["curr_round"] = server_round + return hypearparams + + return fit_config_fn + + +def gen_evaluate_fn( + num_clients: int, + test_loader: DataLoader, + model: DictConfig, + device, +) -> Callable[ + [int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]] +]: + """Generate a centralized evaluation function. + + Parameters + ---------- + model: DictConfig + The model details to evaluate. + test_loader : DataLoader + The dataloader to test the model with. + device : torch.device + The device to test the model on. + + Returns + ------- + Callable[ [int, NDArrays, Dict[str, Scalar]], + Optional[Tuple[float, Dict[str, Scalar]]] ] + The centralized evaluation function. + """ + + # pylint: disable=unused-argument + def evaluate( + server_round, + parameters_ndarrays: NDArrays, + __, + ) -> Optional[Tuple[float, Dict[str, Scalar]]]: + """Use the entire CIFAR-10/100 test set for evaluation.""" + net = instantiate(model) + params_dict = zip(net.state_dict().keys(), parameters_ndarrays) + state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict}) + net.load_state_dict(state_dict, strict=True) + net.to(device) + + loss, accuracy = test(net, test_loader, device=device) + return loss, {"accuracy": accuracy} + + return evaluate + + +def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: + """Do weighted average of metrics.""" + # Multiply accuracy of each client by number of examples used + accuracies = [float(num_examples * m["accuracy"]) for num_examples, m in metrics] + examples = [num_examples for num_examples, _ in metrics] + print(f"accuracies: {sum(accuracies) / sum(examples)}") + # Aggregate and return custom metric (weighted average) + return {"accuracy": sum(accuracies) / sum(examples)} diff --git a/baselines/fedpara/fedpara/strategy.py b/baselines/fedpara/fedpara/strategy.py new file mode 100644 index 000000000000..17436c401c30 --- /dev/null +++ b/baselines/fedpara/fedpara/strategy.py @@ -0,0 +1,5 @@ +"""Optionally define a custom strategy. + +Needed only when the strategy is not yet implemented in Flower or because you want to +extend or modify the functionality of an existing strategy. +""" diff --git a/baselines/fedpara/fedpara/utils.py b/baselines/fedpara/fedpara/utils.py new file mode 100644 index 000000000000..fcaf95f50c59 --- /dev/null +++ b/baselines/fedpara/fedpara/utils.py @@ -0,0 +1,161 @@ +"""Utility functions for FedPara.""" + +import os +import pickle +import random +import time +from pathlib import Path +from secrets import token_hex +from typing import List, Optional + +import matplotlib.pyplot as plt +import numpy as np +import torch +from flwr.common import NDArrays +from flwr.server import History +from omegaconf import DictConfig +from torch.nn import Module + + +def plot_metric_from_history( + hist: History, + save_plot_path: str, + model_size: float, + cfg: DictConfig, + suffix: str = "", +) -> None: + """Plot the metrics from the history of the server. + + Parameters + ---------- + hist : History + Object containing evaluation for all rounds. + save_plot_path : str + Folder to save the plot to. + model_size : float + Size of the model in MB. + cfg : Optional + Optional dictionary containing the configuration of the experiment. + suffix: Optional + Optional string to add at the end of the filename for the plot. + """ + metric_dict = ( + hist.metrics_centralized + if hist.metrics_centralized + else hist.metrics_distributed + ) + _, axs = plt.subplots() + rounds, values_accuracy = zip(*metric_dict["accuracy"]) + r_cc = (i * 2 * model_size * int(cfg.clients_per_round) / 1024 for i in rounds) + + # Set the title + # make the suffix space seperated not underscore seperated + title = " ".join(suffix.split("_")) + axs.set_title(title) + axs.grid(True) + axs.plot(np.asarray([*r_cc]), np.asarray(values_accuracy)) + axs.set_ylabel("Accuracy") + axs.set_xlabel("Communication Cost (GB)") + fig_name = suffix + ".png" + plt.savefig(Path(save_plot_path) / Path(fig_name)) + plt.close() + + +def seed_everything(seed): + """Seed everything for reproducibility.""" + np.random.seed(seed) + torch.manual_seed(seed) + random.seed(seed) + torch.backends.cudnn.deterministic = True + + +def get_parameters(net: Module) -> NDArrays: + """Get the parameters of the network.""" + return [val.cpu().numpy() for _, val in net.state_dict().items()] + + +def save_results_as_pickle( + history: History, + file_path: str, + default_filename: Optional[str] = "history.pkl", +) -> None: + """Save results from simulation to pickle. + + Parameters + ---------- + history: History + History returned by start_simulation. + file_path: Union[str, Path] + Path to file to create and store both history and extra_results. + If path is a directory, the default_filename will be used. + path doesn't exist, it will be created. If file exists, a + randomly generated suffix will be added to the file name. This + is done to avoid overwritting results. + extra_results : Optional[Dict] + A dictionary containing additional results you would like + to be saved to disk. Default: {} (an empty dictionary) + default_filename: Optional[str] + File used by default if file_path points to a directory instead + to a file. Default: "results.pkl" + """ + path = Path(file_path) + + # ensure path exists + path.mkdir(exist_ok=True, parents=True) + + def _add_random_suffix(path_: Path): + """Add a random suffix to the file name.""" + print(f"File `{path_}` exists! ") + suffix = token_hex(4) + print(f"New results to be saved with suffix: {suffix}") + return path_.parent / (path_.stem + "_" + suffix + ".pkl") + + def _complete_path_with_default_name(path_: Path): + """Append the default file name to the path.""" + print("Using default filename") + if default_filename is None: + return path_ + return path_ / default_filename + + if path.is_dir(): + path = _complete_path_with_default_name(path) + + if path.is_file(): + path = _add_random_suffix(path) + + print(f"Results will be saved into: {path}") + # data = {"history": history, **extra_results} + data = {"history": history} + # save results to pickle + with open(str(path), "wb") as handle: + pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def set_client_state_save_path(path: str) -> str: + """Set the client state save path.""" + client_state_save_path = time.strftime("%Y-%m-%d") + client_state_sub_path = time.strftime("%H-%M-%S") + client_state_save_path = f"{path}{client_state_save_path}/{client_state_sub_path}" + if not os.path.exists(client_state_save_path): + os.makedirs(client_state_save_path) + return client_state_save_path + + +def get_keys_state_dict(model, algorithm, mode: str = "local") -> List[str]: + """.""" + keys: List[str] = [] + match algorithm: + case "fedper": + if mode == "local": + keys = list(filter(lambda x: "fc1" not in x, model.state_dict().keys())) + elif mode == "global": + keys = list(filter(lambda x: "fc1" in x, model.state_dict().keys())) + case "pfedpara": + if mode == "local": + keys = list(filter(lambda x: "w2" in x, model.state_dict().keys())) + elif mode == "global": + keys = list(filter(lambda x: "w1" in x, model.state_dict().keys())) + case _: + raise NotImplementedError(f"algorithm {algorithm} not implemented") + + return keys diff --git a/baselines/fedpara/pyproject.toml b/baselines/fedpara/pyproject.toml new file mode 100644 index 000000000000..c5751c536043 --- /dev/null +++ b/baselines/fedpara/pyproject.toml @@ -0,0 +1,142 @@ +[build-system] +requires = ["poetry-core>=1.4.0"] +build-backend = "poetry.masonry.api" + +[tool.poetry] +name = "fedpara" # <----- Ensure it matches the name of your baseline directory containing all the source code +version = "1.0.0" +description = "Flower Baselines" +license = "Apache-2.0" +authors = ["Yahia Salaheldin Shaaban ", "Omar Mokhtar < >", "Roeia Amr < >"] +readme = "README.md" +homepage = "https://flower.dev" +repository = "https://github.com/adap/flower" +documentation = "https://flower.dev" +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: Apache Software License", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: Implementation :: CPython", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Scientific/Engineering :: Mathematics", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + "Typing :: Typed", +] + +[tool.poetry.dependencies] +python = ">=3.10, <3.12.0" # don't change this +flwr = { extras = ["simulation"], version = "1.5.0" } +hydra-core = "1.3.2" # don't change this +matplotlib = "^3.7.2" +tqdm = "^4.66.1" +torch = {url = "https://download.pytorch.org/whl/cu117/torch-2.0.1%2Bcu117-cp310-cp310-linux_x86_64.whl"} +torchvision = { url = "https://download.pytorch.org/whl/cu117/torchvision-0.15.2%2Bcu117-cp310-cp310-linux_x86_64.whl"} + +[tool.poetry.dev-dependencies] +isort = "==5.11.5" +black = "==23.1.0" +docformatter = "==1.5.1" +mypy = "==1.4.1" +pylint = "==2.8.2" +flake8 = "==3.9.2" +pytest = "==6.2.4" +pytest-watch = "==4.2.0" +ruff = "==0.0.272" +types-requests = "==2.27.7" +virtualenv = "20.21.0" + +[tool.isort] +line_length = 88 +indent = " " +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +use_parentheses = true + +[tool.black] +line-length = 88 +target-version = ["py38", "py39", "py310", "py311"] + +[tool.pytest.ini_options] +minversion = "6.2" +addopts = "-qq" +testpaths = [ + "flwr_baselines", +] + +[tool.mypy] +ignore_missing_imports = true +strict = false +plugins = "numpy.typing.mypy_plugin" + +[tool.pylint."MESSAGES CONTROL"] +disable = "bad-continuation,duplicate-code,too-few-public-methods,useless-import-alias" +good-names = "i,j,k,_,x,y,X,Y,m,w1,w2,W1,W2,n,r1,r2,r3,W,w,v,lr,f,r,T,a,b,c" +signature-mutators="hydra.main.main" + +[tool.pylint.typecheck] +generated-members="numpy.*, torch.*, tensorflow.*" + +[[tool.mypy.overrides]] +module = [ + "importlib.metadata.*", + "importlib_metadata.*", +] +follow_imports = "skip" +follow_imports_for_stubs = true +disallow_untyped_calls = false + +[[tool.mypy.overrides]] +module = "torch.*" +follow_imports = "skip" +follow_imports_for_stubs = true + +[tool.docformatter] +wrap-summaries = 88 +wrap-descriptions = 88 + +[tool.ruff] +target-version = "py38" +line-length = 88 +select = ["D", "E", "F", "W", "B", "ISC", "C4"] +fixable = ["D", "E", "F", "W", "B", "ISC", "C4"] +ignore = ["B024", "B027"] +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", + "proto", +] + +[tool.ruff.pydocstyle] +convention = "numpy" diff --git a/doc/source/ref-changelog.md b/doc/source/ref-changelog.md index f1af015ac24c..685553aee657 100644 --- a/doc/source/ref-changelog.md +++ b/doc/source/ref-changelog.md @@ -24,6 +24,8 @@ - FedAvgM [#2246](https://github.com/adap/flower/pull/2246) + - FedPara [#2722](https://github.com/adap/flower/pull/2722) + ## v1.6.0 (2023-11-28) ### Thanks to our contributors