diff --git a/baselines/hfedxgboost/hfedxgboost/conf/base.yaml b/baselines/hfedxgboost/hfedxgboost/conf/base.yaml index 310b123a9054..284aab88e316 100644 --- a/baselines/hfedxgboost/hfedxgboost/conf/base.yaml +++ b/baselines/hfedxgboost/hfedxgboost/conf/base.yaml @@ -37,7 +37,7 @@ client_resources: num_gpus: 0.0 strategy: - _target_: flwr.server.strategy.FedXgbNnAvg + _target_: hfedxgboost.strategy.FedXgbNnAvg _recursive_: true #everything to be instantiated fraction_fit: 1.0 fraction_evaluate: 0.0 # no clients will be sampled for federated evaluation (we will still perform global evaluation) diff --git a/baselines/hfedxgboost/hfedxgboost/strategy.py b/baselines/hfedxgboost/hfedxgboost/strategy.py index 17436c401c30..eb067a89e5f0 100644 --- a/baselines/hfedxgboost/hfedxgboost/strategy.py +++ b/baselines/hfedxgboost/hfedxgboost/strategy.py @@ -3,3 +3,77 @@ Needed only when the strategy is not yet implemented in Flower or because you want to extend or modify the functionality of an existing strategy. """ +from logging import WARNING +from typing import Any, Dict, List, Optional, Tuple, Union + +from flwr.common import FitRes, Scalar, ndarrays_to_parameters, parameters_to_ndarrays +from flwr.common.logger import log +from flwr.server.client_proxy import ClientProxy + +from flwr.server.strategy.aggregate import aggregate +from flwr.server.strategy import FedAvg + + +class FedXgbNnAvg(FedAvg): + """Configurable FedXgbNnAvg strategy implementation.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Federated XGBoost [Ma et al., 2023] strategy. + + Implementation based on https://arxiv.org/abs/2304.07537. + """ + super().__init__(*args, **kwargs) + + def __repr__(self) -> str: + """Compute a string representation of the strategy.""" + rep = f"FedXgbNnAvg(accept_failures={self.accept_failures})" + return rep + + def evaluate( + self, server_round: int, parameters: Any + ) -> Optional[Tuple[float, Dict[str, Scalar]]]: + """Evaluate model parameters using an evaluation function.""" + if self.evaluate_fn is None: + # No evaluation function provided + return None + eval_res = self.evaluate_fn(server_round, parameters, {}) + if eval_res is None: + return None + loss, metrics = eval_res + return loss, metrics + + def aggregate_fit( + self, + server_round: int, + results: List[Tuple[ClientProxy, FitRes]], + failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]], + ) -> Tuple[Optional[Any], Dict[str, Scalar]]: + """Aggregate fit results using weighted average.""" + if not results: + return None, {} + # Do not aggregate if there are failures and failures are not accepted + if not self.accept_failures and failures: + return None, {} + + # Convert results + weights_results = [ + ( + parameters_to_ndarrays(fit_res.parameters[0].parameters), # type: ignore # noqa: E501 # pylint: disable=line-too-long + fit_res.num_examples, + ) + for _, fit_res in results + ] + parameters_aggregated = ndarrays_to_parameters(aggregate(weights_results)) + + # Aggregate XGBoost trees from all clients + trees_aggregated = [fit_res.parameters[1] for _, fit_res in results] # type: ignore # noqa: E501 # pylint: disable=line-too-long + + # Aggregate custom metrics if aggregation fn was provided + metrics_aggregated = {} + if self.fit_metrics_aggregation_fn: + fit_metrics = [(res.num_examples, res.metrics) for _, res in results] + metrics_aggregated = self.fit_metrics_aggregation_fn(fit_metrics) + elif server_round == 1: # Only log this warning once + log(WARNING, "No fit_metrics_aggregation_fn provided") + + return [parameters_aggregated, trees_aggregated], metrics_aggregated diff --git a/examples/quickstart-xgboost-horizontal/.gitignore b/examples/quickstart-xgboost-horizontal/.gitignore deleted file mode 100644 index 4a6ddf5b9142..000000000000 --- a/examples/quickstart-xgboost-horizontal/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -dataset - diff --git a/examples/quickstart-xgboost-horizontal/README.md b/examples/quickstart-xgboost-horizontal/README.md deleted file mode 100644 index 346a33da7412..000000000000 --- a/examples/quickstart-xgboost-horizontal/README.md +++ /dev/null @@ -1,19 +0,0 @@ -# Federated XGBoost in Horizontal Setting (PyTorch) - -[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/adap/flower/blob/main/examples/quickstart-xgboost-horizontal/code_horizontal.ipynb) (or open the [Jupyter Notebook](https://github.com/adap/flower/blob/main/examples/quickstart-xgboost-horizontal/code_horizontal.ipynb)) - -This example demonstrates a federated XGBoost using Flower with PyTorch. This is a novel method to conduct federated XGBoost in the horizontal setting. It differs from the previous methods in the following ways: - -- We aggregate and conduct federated learning on client tree’s prediction outcomes by sending clients' built XGBoost trees to the server and then sharing to the clients. -- The exchange of privacy-sensitive information (gradients) is not needed. -- The model is a CNN with 1D convolution kernel size = the number of XGBoost trees in the client tree ensembles. -- Using 1D convolution, we make the tree learning rate (a hyperparameter of XGBoost) learnable. - -## Project Setup - -This implementation can be easily run in Google Colab with the button at the top of the README or as a standalone Jupyter notebook, -it will automatically download and extract the example data inside a `dataset` folder and `binary_classification` and `regression` sub-folders. - -## Datasets - -This implementation supports both binary classification and regression datasets in SVM light format, loaded from ([LIBSVM Data](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/)). Simply download the dataset files from the website and put them in the folder location indicated above. diff --git a/examples/quickstart-xgboost-horizontal/code_horizontal.ipynb b/examples/quickstart-xgboost-horizontal/code_horizontal.ipynb deleted file mode 100644 index 4d76e0c26023..000000000000 --- a/examples/quickstart-xgboost-horizontal/code_horizontal.ipynb +++ /dev/null @@ -1,1560 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Initialization" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 15871, - "status": "ok", - "timestamp": 1670356049976, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "2c588ea0-a383-4461-e633-794e73d0f57a" - }, - "outputs": [], - "source": [ - "import os\n", - "import urllib.request\n", - "import bz2\n", - "import shutil\n", - "\n", - "CLASSIFICATION_PATH = os.path.join(\"dataset\", \"binary_classification\")\n", - "REGRESSION_PATH = os.path.join(\"dataset\", \"regression\")\n", - "\n", - "if not os.path.exists(CLASSIFICATION_PATH):\n", - " os.makedirs(CLASSIFICATION_PATH)\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/cod-rna\",\n", - " f\"{os.path.join(CLASSIFICATION_PATH, 'cod-rna')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/cod-rna.t\",\n", - " f\"{os.path.join(CLASSIFICATION_PATH, 'cod-rna.t')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/cod-rna.r\",\n", - " f\"{os.path.join(CLASSIFICATION_PATH, 'cod-rna.r')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/ijcnn1.t.bz2\",\n", - " f\"{os.path.join(CLASSIFICATION_PATH, 'ijcnn1.t.bz2')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/ijcnn1.tr.bz2\",\n", - " f\"{os.path.join(CLASSIFICATION_PATH, 'ijcnn1.tr.bz2')}\",\n", - " )\n", - " for filepath in os.listdir(CLASSIFICATION_PATH):\n", - " if filepath[-3:] == \"bz2\":\n", - " abs_filepath = os.path.join(CLASSIFICATION_PATH, filepath)\n", - " with bz2.BZ2File(abs_filepath) as fr, open(abs_filepath[:-4], \"wb\") as fw:\n", - " shutil.copyfileobj(fr, fw)\n", - "\n", - "if not os.path.exists(REGRESSION_PATH):\n", - " os.makedirs(REGRESSION_PATH)\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/eunite2001\",\n", - " f\"{os.path.join(REGRESSION_PATH, 'eunite2001')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/eunite2001.t\",\n", - " f\"{os.path.join(REGRESSION_PATH, 'eunite2001.t')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/YearPredictionMSD.bz2\",\n", - " f\"{os.path.join(REGRESSION_PATH, 'YearPredictionMSD.bz2')}\",\n", - " )\n", - " urllib.request.urlretrieve(\n", - " \"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/YearPredictionMSD.t.bz2\",\n", - " f\"{os.path.join(REGRESSION_PATH, 'YearPredictionMSD.t.bz2')}\",\n", - " )\n", - " for filepath in os.listdir(REGRESSION_PATH):\n", - " if filepath[-3:] == \"bz2\":\n", - " abs_filepath = os.path.join(REGRESSION_PATH, filepath)\n", - " with bz2.BZ2File(abs_filepath) as fr, open(abs_filepath[:-4], \"wb\") as fw:\n", - " shutil.copyfileobj(fr, fw)\n", - "\n", - "\n", - "!nvidia-smi\n", - "!pip install matplotlib scikit-learn tqdm torch torchmetrics torchsummary xgboost\n", - "!pip install -U \"flwr-nightly[simulation]\"" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Import relevant modules" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 7, - "status": "ok", - "timestamp": 1670356049977, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "5289e33e-e18e-491b-d536-6b1052598994" - }, - "outputs": [], - "source": [ - "import xgboost as xgb\n", - "from xgboost import XGBClassifier, XGBRegressor\n", - "from sklearn.metrics import mean_squared_error, accuracy_score\n", - "from sklearn.datasets import load_svmlight_file\n", - "\n", - "import numpy as np\n", - "import torch, torch.nn as nn\n", - "import torch.nn.functional as F\n", - "import torchvision\n", - "from torchmetrics import Accuracy, MeanSquaredError\n", - "from tqdm import trange, tqdm\n", - "from torchsummary import summary\n", - "from torch.utils.data import DataLoader, Dataset, random_split\n", - "\n", - "print(\"Imported modules.\")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Import Flower relevant modules for Federated XGBoost" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import flwr as fl\n", - "from flwr.common.typing import Parameters\n", - "from collections import OrderedDict\n", - "from typing import Any, Dict, List, Optional, Tuple, Union\n", - "from flwr.common import NDArray, NDArrays\n", - "\n", - "print(\"Imported modules.\")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Define utility function for xgboost trees" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from matplotlib import pyplot as plt # pylint: disable=E0401\n", - "\n", - "\n", - "def plot_xgbtree(tree: Union[XGBClassifier, XGBRegressor], n_tree: int) -> None:\n", - " \"\"\"Visualize the built xgboost tree.\"\"\"\n", - " xgb.plot_tree(tree, num_trees=n_tree)\n", - " plt.rcParams[\"figure.figsize\"] = [50, 10]\n", - " plt.show()\n", - "\n", - "\n", - "def construct_tree(\n", - " dataset: Dataset, label: NDArray, n_estimators: int, tree_type: str\n", - ") -> Union[XGBClassifier, XGBRegressor]:\n", - " \"\"\"Construct a xgboost tree form tabular dataset.\"\"\"\n", - " if tree_type == \"BINARY\":\n", - " tree = xgb.XGBClassifier(\n", - " objective=\"binary:logistic\",\n", - " learning_rate=0.1,\n", - " max_depth=8,\n", - " n_estimators=n_estimators,\n", - " subsample=0.8,\n", - " colsample_bylevel=1,\n", - " colsample_bynode=1,\n", - " colsample_bytree=1,\n", - " alpha=5,\n", - " gamma=5,\n", - " num_parallel_tree=1,\n", - " min_child_weight=1,\n", - " )\n", - "\n", - " elif tree_type == \"REG\":\n", - " tree = xgb.XGBRegressor(\n", - " objective=\"reg:squarederror\",\n", - " learning_rate=0.1,\n", - " max_depth=8,\n", - " n_estimators=n_estimators,\n", - " subsample=0.8,\n", - " colsample_bylevel=1,\n", - " colsample_bynode=1,\n", - " colsample_bytree=1,\n", - " alpha=5,\n", - " gamma=5,\n", - " num_parallel_tree=1,\n", - " min_child_weight=1,\n", - " )\n", - "\n", - " tree.fit(dataset, label)\n", - " return tree\n", - "\n", - "\n", - "def construct_tree_from_loader(\n", - " dataset_loader: DataLoader, n_estimators: int, tree_type: str\n", - ") -> Union[XGBClassifier, XGBRegressor]:\n", - " \"\"\"Construct a xgboost tree form tabular dataset loader.\"\"\"\n", - " for dataset in dataset_loader:\n", - " data, label = dataset[0], dataset[1]\n", - " return construct_tree(data, label, n_estimators, tree_type)\n", - "\n", - "\n", - "def single_tree_prediction(\n", - " tree: Union[XGBClassifier, XGBRegressor], n_tree: int, dataset: NDArray\n", - ") -> Optional[NDArray]:\n", - " \"\"\"Extract the prediction result of a single tree in the xgboost tree\n", - " ensemble.\"\"\"\n", - " # How to access a single tree\n", - " # https://github.com/bmreiniger/datascience.stackexchange/blob/master/57905.ipynb\n", - " num_t = len(tree.get_booster().get_dump())\n", - " if n_tree > num_t:\n", - " print(\n", - " \"The tree index to be extracted is larger than the total number of trees.\"\n", - " )\n", - " return None\n", - "\n", - " return tree.predict( # type: ignore\n", - " dataset, iteration_range=(n_tree, n_tree + 1), output_margin=True\n", - " )\n", - "\n", - "\n", - "def tree_encoding( # pylint: disable=R0914\n", - " trainloader: DataLoader,\n", - " client_trees: Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ],\n", - " client_tree_num: int,\n", - " client_num: int,\n", - ") -> Optional[Tuple[NDArray, NDArray]]:\n", - " \"\"\"Transform the tabular dataset into prediction results using the\n", - " aggregated xgboost tree ensembles from all clients.\"\"\"\n", - " if trainloader is None:\n", - " return None\n", - "\n", - " for local_dataset in trainloader:\n", - " x_train, y_train = local_dataset[0], local_dataset[1]\n", - "\n", - " x_train_enc = np.zeros((x_train.shape[0], client_num * client_tree_num))\n", - " x_train_enc = np.array(x_train_enc, copy=True)\n", - "\n", - " temp_trees: Any = None\n", - " if isinstance(client_trees, list) is False:\n", - " temp_trees = [client_trees[0]] * client_num\n", - " elif isinstance(client_trees, list) and len(client_trees) != client_num:\n", - " temp_trees = [client_trees[0][0]] * client_num\n", - " else:\n", - " cids = []\n", - " temp_trees = []\n", - " for i, _ in enumerate(client_trees):\n", - " temp_trees.append(client_trees[i][0]) # type: ignore\n", - " cids.append(client_trees[i][1]) # type: ignore\n", - " sorted_index = np.argsort(np.asarray(cids))\n", - " temp_trees = np.asarray(temp_trees)[sorted_index]\n", - "\n", - " for i, _ in enumerate(temp_trees):\n", - " for j in range(client_tree_num):\n", - " x_train_enc[:, i * client_tree_num + j] = single_tree_prediction(\n", - " temp_trees[i], j, x_train\n", - " )\n", - "\n", - " x_train_enc32: Any = np.float32(x_train_enc)\n", - " y_train32: Any = np.float32(y_train)\n", - "\n", - " x_train_enc32, y_train32 = torch.from_numpy(\n", - " np.expand_dims(x_train_enc32, axis=1) # type: ignore\n", - " ), torch.from_numpy(\n", - " np.expand_dims(y_train32, axis=-1) # type: ignore\n", - " )\n", - " return x_train_enc32, y_train32" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Manually download and load the tabular dataset from LIBSVM data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 26613, - "status": "ok", - "timestamp": 1670356076585, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "22843504-faf0-44cf-aedd-1df8d0ec87a6" - }, - "outputs": [], - "source": [ - "# Datasets can be downloaded from LIBSVM Data: https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/\n", - "binary_train = [\"cod-rna.t\", \"cod-rna\", \"ijcnn1.t\"]\n", - "binary_test = [\"cod-rna.r\", \"cod-rna.t\", \"ijcnn1.tr\"]\n", - "reg_train = [\"eunite2001\", \"YearPredictionMSD\"]\n", - "reg_test = [\"eunite2001.t\", \"YearPredictionMSD.t\"]\n", - "\n", - "# Define the type of training task. Binary classification: BINARY; Regression: REG\n", - "task_types = [\"BINARY\", \"REG\"]\n", - "task_type = task_types[0]\n", - "\n", - "# Select the downloaded training and test dataset\n", - "if task_type == \"BINARY\":\n", - " dataset_path = \"dataset/binary_classification/\"\n", - " train = binary_train[0]\n", - " test = binary_test[0]\n", - "elif task_type == \"REG\":\n", - " dataset_path = \"dataset/regression/\"\n", - " train = reg_train[0]\n", - " test = reg_test[0]\n", - "\n", - "data_train = load_svmlight_file(dataset_path + train, zero_based=False)\n", - "data_test = load_svmlight_file(dataset_path + test, zero_based=False)\n", - "\n", - "print(\"Task type selected is: \" + task_type)\n", - "print(\"Training dataset is: \" + train)\n", - "print(\"Test dataset is: \" + test)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Preprocess the tabular dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "class TreeDataset(Dataset):\n", - " def __init__(self, data: NDArray, labels: NDArray) -> None:\n", - " self.labels = labels\n", - " self.data = data\n", - "\n", - " def __len__(self) -> int:\n", - " return len(self.labels)\n", - "\n", - " def __getitem__(self, idx: int) -> Dict[int, NDArray]:\n", - " label = self.labels[idx]\n", - " data = self.data[idx, :]\n", - " sample = {0: data, 1: label}\n", - " return sample\n", - "\n", - "\n", - "X_train = data_train[0].toarray()\n", - "y_train = data_train[1]\n", - "X_test = data_test[0].toarray()\n", - "y_test = data_test[1]\n", - "X_train.flags.writeable = True\n", - "y_train.flags.writeable = True\n", - "X_test.flags.writeable = True\n", - "y_test.flags.writeable = True\n", - "\n", - "# If the feature dimensions of the trainset and testset do not agree,\n", - "# specify n_features in the load_svmlight_file function in the above cell.\n", - "# https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_svmlight_file.html\n", - "print(\"Feature dimension of the dataset:\", X_train.shape[1])\n", - "print(\"Size of the trainset:\", X_train.shape[0])\n", - "print(\"Size of the testset:\", X_test.shape[0])\n", - "assert X_train.shape[1] == X_test.shape[1]\n", - "\n", - "if task_type == \"BINARY\":\n", - " y_train[y_train == -1] = 0\n", - " y_test[y_test == -1] = 0\n", - "\n", - "trainset = TreeDataset(np.array(X_train, copy=True), np.array(y_train, copy=True))\n", - "testset = TreeDataset(np.array(X_test, copy=True), np.array(y_test, copy=True))" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Conduct tabular dataset partition for Federated Learning" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def get_dataloader(\n", - " dataset: Dataset, partition: str, batch_size: Union[int, str]\n", - ") -> DataLoader:\n", - " if batch_size == \"whole\":\n", - " batch_size = len(dataset)\n", - " return DataLoader(\n", - " dataset, batch_size=batch_size, pin_memory=True, shuffle=(partition == \"train\")\n", - " )\n", - "\n", - "\n", - "# https://github.com/adap/flower\n", - "def do_fl_partitioning(\n", - " trainset: Dataset,\n", - " testset: Dataset,\n", - " pool_size: int,\n", - " batch_size: Union[int, str],\n", - " val_ratio: float = 0.0,\n", - ") -> Tuple[DataLoader, DataLoader, DataLoader]:\n", - " # Split training set into `num_clients` partitions to simulate different local datasets\n", - " partition_size = len(trainset) // pool_size\n", - " lengths = [partition_size] * pool_size\n", - " if sum(lengths) != len(trainset):\n", - " lengths[-1] = len(trainset) - sum(lengths[0:-1])\n", - " datasets = random_split(trainset, lengths, torch.Generator().manual_seed(0))\n", - "\n", - " # Split each partition into train/val and create DataLoader\n", - " trainloaders = []\n", - " valloaders = []\n", - " for ds in datasets:\n", - " len_val = int(len(ds) * val_ratio)\n", - " len_train = len(ds) - len_val\n", - " lengths = [len_train, len_val]\n", - " ds_train, ds_val = random_split(ds, lengths, torch.Generator().manual_seed(0))\n", - " trainloaders.append(get_dataloader(ds_train, \"train\", batch_size))\n", - " if len_val != 0:\n", - " valloaders.append(get_dataloader(ds_val, \"val\", batch_size))\n", - " else:\n", - " valloaders = None\n", - " testloader = get_dataloader(testset, \"test\", batch_size)\n", - " return trainloaders, valloaders, testloader" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Define global variables for Federated XGBoost Learning" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# The number of clients participated in the federated learning\n", - "client_num = 5\n", - "\n", - "# The number of XGBoost trees in the tree ensemble that will be built for each client\n", - "client_tree_num = 500 // client_num" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Build global XGBoost tree for comparison" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 1080216, - "status": "ok", - "timestamp": 1670357156788, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "d56f2821-5cd5-49ff-c5dc-f8d088eed799" - }, - "outputs": [], - "source": [ - "global_tree = construct_tree(X_train, y_train, client_tree_num, task_type)\n", - "preds_train = global_tree.predict(X_train)\n", - "preds_test = global_tree.predict(X_test)\n", - "\n", - "if task_type == \"BINARY\":\n", - " result_train = accuracy_score(y_train, preds_train)\n", - " result_test = accuracy_score(y_test, preds_test)\n", - " print(\"Global XGBoost Training Accuracy: %f\" % (result_train))\n", - " print(\"Global XGBoost Testing Accuracy: %f\" % (result_test))\n", - "elif task_type == \"REG\":\n", - " result_train = mean_squared_error(y_train, preds_train)\n", - " result_test = mean_squared_error(y_test, preds_test)\n", - " print(\"Global XGBoost Training MSE: %f\" % (result_train))\n", - " print(\"Global XGBoost Testing MSE: %f\" % (result_test))\n", - "\n", - "print(global_tree)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Simulate local XGBoost trees on clients for comparison" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 242310, - "status": "ok", - "timestamp": 1670357399084, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "0739df9f-84de-4749-8de1-7bd7c6a32ccc" - }, - "outputs": [], - "source": [ - "client_trees_comparison = []\n", - "trainloaders, _, testloader = do_fl_partitioning(\n", - " trainset, testset, pool_size=client_num, batch_size=\"whole\", val_ratio=0.0\n", - ")\n", - "\n", - "for i, trainloader in enumerate(trainloaders):\n", - " for local_dataset in trainloader:\n", - " local_X_train, local_y_train = local_dataset[0], local_dataset[1]\n", - " tree = construct_tree(local_X_train, local_y_train, client_tree_num, task_type)\n", - " client_trees_comparison.append(tree)\n", - "\n", - " preds_train = client_trees_comparison[-1].predict(local_X_train)\n", - " preds_test = client_trees_comparison[-1].predict(X_test)\n", - "\n", - " if task_type == \"BINARY\":\n", - " result_train = accuracy_score(local_y_train, preds_train)\n", - " result_test = accuracy_score(y_test, preds_test)\n", - " print(\"Local Client %d XGBoost Training Accuracy: %f\" % (i, result_train))\n", - " print(\"Local Client %d XGBoost Testing Accuracy: %f\" % (i, result_test))\n", - " elif task_type == \"REG\":\n", - " result_train = mean_squared_error(local_y_train, preds_train)\n", - " result_test = mean_squared_error(y_test, preds_test)\n", - " print(\"Local Client %d XGBoost Training MSE: %f\" % (i, result_train))\n", - " print(\"Local Client %d XGBoost Testing MSE: %f\" % (i, result_test))" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Centralized Federated XGBoost\n", - "#### Create 1D convolutional neural network on trees prediction results. \n", - "#### 1D kernel size == client_tree_num\n", - "#### Make the learning rate of the tree ensembles learnable." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "executionInfo": { - "elapsed": 38, - "status": "ok", - "timestamp": 1670363021675, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - } - }, - "outputs": [], - "source": [ - "class CNN(nn.Module):\n", - " def __init__(self, n_channel: int = 64) -> None:\n", - " super(CNN, self).__init__()\n", - " n_out = 1\n", - " self.task_type = task_type\n", - " self.conv1d = nn.Conv1d(\n", - " 1, n_channel, kernel_size=client_tree_num, stride=client_tree_num, padding=0\n", - " )\n", - " self.layer_direct = nn.Linear(n_channel * client_num, n_out)\n", - " self.ReLU = nn.ReLU()\n", - " self.Sigmoid = nn.Sigmoid()\n", - " self.Identity = nn.Identity()\n", - "\n", - " # Add weight initialization\n", - " for layer in self.modules():\n", - " if isinstance(layer, nn.Linear):\n", - " nn.init.kaiming_uniform_(\n", - " layer.weight, mode=\"fan_in\", nonlinearity=\"relu\"\n", - " )\n", - "\n", - " def forward(self, x: torch.Tensor) -> torch.Tensor:\n", - " x = self.ReLU(self.conv1d(x))\n", - " x = x.flatten(start_dim=1)\n", - " x = self.ReLU(x)\n", - " if self.task_type == \"BINARY\":\n", - " x = self.Sigmoid(self.layer_direct(x))\n", - " elif self.task_type == \"REG\":\n", - " x = self.Identity(self.layer_direct(x))\n", - " return x\n", - "\n", - " def get_weights(self) -> fl.common.NDArrays:\n", - " \"\"\"Get model weights as a list of NumPy ndarrays.\"\"\"\n", - " return [\n", - " np.array(val.cpu().numpy(), copy=True)\n", - " for _, val in self.state_dict().items()\n", - " ]\n", - "\n", - " def set_weights(self, weights: fl.common.NDArrays) -> None:\n", - " \"\"\"Set model weights from a list of NumPy ndarrays.\"\"\"\n", - " layer_dict = {}\n", - " for k, v in zip(self.state_dict().keys(), weights):\n", - " if v.ndim != 0:\n", - " layer_dict[k] = torch.Tensor(np.array(v, copy=True))\n", - " state_dict = OrderedDict(layer_dict)\n", - " self.load_state_dict(state_dict, strict=True)\n", - "\n", - "\n", - "def train(\n", - " task_type: str,\n", - " net: CNN,\n", - " trainloader: DataLoader,\n", - " device: torch.device,\n", - " num_iterations: int,\n", - " log_progress: bool = True,\n", - ") -> Tuple[float, float, int]:\n", - " # Define loss and optimizer\n", - " if task_type == \"BINARY\":\n", - " criterion = nn.BCELoss()\n", - " elif task_type == \"REG\":\n", - " criterion = nn.MSELoss()\n", - " # optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-6)\n", - " optimizer = torch.optim.Adam(net.parameters(), lr=0.0001, betas=(0.9, 0.999))\n", - "\n", - " def cycle(iterable):\n", - " \"\"\"Repeats the contents of the train loader, in case it gets exhausted in 'num_iterations'.\"\"\"\n", - " while True:\n", - " for x in iterable:\n", - " yield x\n", - "\n", - " # Train the network\n", - " net.train()\n", - " total_loss, total_result, n_samples = 0.0, 0.0, 0\n", - " pbar = (\n", - " tqdm(iter(cycle(trainloader)), total=num_iterations, desc=f\"TRAIN\")\n", - " if log_progress\n", - " else iter(cycle(trainloader))\n", - " )\n", - "\n", - " # Unusually, this training is formulated in terms of number of updates/iterations/batches processed\n", - " # by the network. This will be helpful later on, when partitioning the data across clients: resulting\n", - " # in differences between dataset sizes and hence inconsistent numbers of updates per 'epoch'.\n", - " for i, data in zip(range(num_iterations), pbar):\n", - " tree_outputs, labels = data[0].to(device), data[1].to(device)\n", - " optimizer.zero_grad()\n", - "\n", - " outputs = net(tree_outputs)\n", - " loss = criterion(outputs, labels)\n", - " loss.backward()\n", - " optimizer.step()\n", - "\n", - " # Collected training loss and accuracy statistics\n", - " total_loss += loss.item()\n", - " n_samples += labels.size(0)\n", - "\n", - " if task_type == \"BINARY\":\n", - " acc = Accuracy(task=\"binary\")(outputs, labels.type(torch.int))\n", - " total_result += acc * labels.size(0)\n", - " elif task_type == \"REG\":\n", - " mse = MeanSquaredError()(outputs, labels.type(torch.int))\n", - " total_result += mse * labels.size(0)\n", - "\n", - " if log_progress:\n", - " if task_type == \"BINARY\":\n", - " pbar.set_postfix(\n", - " {\n", - " \"train_loss\": total_loss / n_samples,\n", - " \"train_acc\": total_result / n_samples,\n", - " }\n", - " )\n", - " elif task_type == \"REG\":\n", - " pbar.set_postfix(\n", - " {\n", - " \"train_loss\": total_loss / n_samples,\n", - " \"train_mse\": total_result / n_samples,\n", - " }\n", - " )\n", - " if log_progress:\n", - " print(\"\\n\")\n", - "\n", - " return total_loss / n_samples, total_result / n_samples, n_samples\n", - "\n", - "\n", - "def test(\n", - " task_type: str,\n", - " net: CNN,\n", - " testloader: DataLoader,\n", - " device: torch.device,\n", - " log_progress: bool = True,\n", - ") -> Tuple[float, float, int]:\n", - " \"\"\"Evaluates the network on test data.\"\"\"\n", - " if task_type == \"BINARY\":\n", - " criterion = nn.BCELoss()\n", - " elif task_type == \"REG\":\n", - " criterion = nn.MSELoss()\n", - "\n", - " total_loss, total_result, n_samples = 0.0, 0.0, 0\n", - " net.eval()\n", - " with torch.no_grad():\n", - " pbar = tqdm(testloader, desc=\"TEST\") if log_progress else testloader\n", - " for data in pbar:\n", - " tree_outputs, labels = data[0].to(device), data[1].to(device)\n", - " outputs = net(tree_outputs)\n", - "\n", - " # Collected testing loss and accuracy statistics\n", - " total_loss += criterion(outputs, labels).item()\n", - " n_samples += labels.size(0)\n", - "\n", - " if task_type == \"BINARY\":\n", - " acc = Accuracy(task=\"binary\")(\n", - " outputs.cpu(), labels.type(torch.int).cpu()\n", - " )\n", - " total_result += acc * labels.size(0)\n", - " elif task_type == \"REG\":\n", - " mse = MeanSquaredError()(outputs.cpu(), labels.type(torch.int).cpu())\n", - " total_result += mse * labels.size(0)\n", - "\n", - " if log_progress:\n", - " print(\"\\n\")\n", - "\n", - " return total_loss / n_samples, total_result / n_samples, n_samples" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create Flower custom client\n", - "## Import Flower custom client relevant modules" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Flower client\n", - "from flwr.common import (\n", - " EvaluateIns,\n", - " EvaluateRes,\n", - " FitIns,\n", - " FitRes,\n", - " GetPropertiesIns,\n", - " GetPropertiesRes,\n", - " GetParametersIns,\n", - " GetParametersRes,\n", - " Status,\n", - " Code,\n", - " parameters_to_ndarrays,\n", - " ndarrays_to_parameters,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "executionInfo": { - "elapsed": 36, - "status": "ok", - "timestamp": 1670363021676, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - } - }, - "outputs": [], - "source": [ - "def tree_encoding_loader(\n", - " dataloader: DataLoader,\n", - " batch_size: int,\n", - " client_trees: Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ],\n", - " client_tree_num: int,\n", - " client_num: int,\n", - ") -> DataLoader:\n", - " encoding = tree_encoding(dataloader, client_trees, client_tree_num, client_num)\n", - " if encoding is None:\n", - " return None\n", - " data, labels = encoding\n", - " tree_dataset = TreeDataset(data, labels)\n", - " return get_dataloader(tree_dataset, \"tree\", batch_size)\n", - "\n", - "\n", - "class FL_Client(fl.client.Client):\n", - " def __init__(\n", - " self,\n", - " task_type: str,\n", - " trainloader: DataLoader,\n", - " valloader: DataLoader,\n", - " client_tree_num: int,\n", - " client_num: int,\n", - " cid: str,\n", - " log_progress: bool = False,\n", - " ):\n", - " \"\"\"\n", - " Creates a client for training `network.Net` on tabular dataset.\n", - " \"\"\"\n", - " self.task_type = task_type\n", - " self.cid = cid\n", - " self.tree = construct_tree_from_loader(trainloader, client_tree_num, task_type)\n", - " self.trainloader_original = trainloader\n", - " self.valloader_original = valloader\n", - " self.trainloader = None\n", - " self.valloader = None\n", - " self.client_tree_num = client_tree_num\n", - " self.client_num = client_num\n", - " self.properties = {\"tensor_type\": \"numpy.ndarray\"}\n", - " self.log_progress = log_progress\n", - "\n", - " # instantiate model\n", - " self.net = CNN()\n", - "\n", - " # determine device\n", - " self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n", - "\n", - " def get_properties(self, ins: GetPropertiesIns) -> GetPropertiesRes:\n", - " return GetPropertiesRes(properties=self.properties)\n", - "\n", - " def get_parameters(\n", - " self, ins: GetParametersIns\n", - " ) -> Tuple[\n", - " GetParametersRes, Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]\n", - " ]:\n", - " return [\n", - " GetParametersRes(\n", - " status=Status(Code.OK, \"\"),\n", - " parameters=ndarrays_to_parameters(self.net.get_weights()),\n", - " ),\n", - " (self.tree, int(self.cid)),\n", - " ]\n", - "\n", - " def set_parameters(\n", - " self,\n", - " parameters: Tuple[\n", - " Parameters,\n", - " Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ],\n", - " ],\n", - " ) -> Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ]:\n", - " self.net.set_weights(parameters_to_ndarrays(parameters[0]))\n", - " return parameters[1]\n", - "\n", - " def fit(self, fit_params: FitIns) -> FitRes:\n", - " # Process incoming request to train\n", - " num_iterations = fit_params.config[\"num_iterations\"]\n", - " batch_size = fit_params.config[\"batch_size\"]\n", - " aggregated_trees = self.set_parameters(fit_params.parameters)\n", - "\n", - " if type(aggregated_trees) is list:\n", - " print(\"Client \" + self.cid + \": recieved\", len(aggregated_trees), \"trees\")\n", - " else:\n", - " print(\"Client \" + self.cid + \": only had its own tree\")\n", - " self.trainloader = tree_encoding_loader(\n", - " self.trainloader_original,\n", - " batch_size,\n", - " aggregated_trees,\n", - " self.client_tree_num,\n", - " self.client_num,\n", - " )\n", - " self.valloader = tree_encoding_loader(\n", - " self.valloader_original,\n", - " batch_size,\n", - " aggregated_trees,\n", - " self.client_tree_num,\n", - " self.client_num,\n", - " )\n", - "\n", - " # num_iterations = None special behaviour: train(...) runs for a single epoch, however many updates it may be\n", - " num_iterations = num_iterations or len(self.trainloader)\n", - "\n", - " # Train the model\n", - " print(f\"Client {self.cid}: training for {num_iterations} iterations/updates\")\n", - " self.net.to(self.device)\n", - " train_loss, train_result, num_examples = train(\n", - " self.task_type,\n", - " self.net,\n", - " self.trainloader,\n", - " device=self.device,\n", - " num_iterations=num_iterations,\n", - " log_progress=self.log_progress,\n", - " )\n", - " print(\n", - " f\"Client {self.cid}: training round complete, {num_examples} examples processed\"\n", - " )\n", - "\n", - " # Return training information: model, number of examples processed and metrics\n", - " if self.task_type == \"BINARY\":\n", - " return FitRes(\n", - " status=Status(Code.OK, \"\"),\n", - " parameters=self.get_parameters(fit_params.config),\n", - " num_examples=num_examples,\n", - " metrics={\"loss\": train_loss, \"accuracy\": train_result},\n", - " )\n", - " elif self.task_type == \"REG\":\n", - " return FitRes(\n", - " status=Status(Code.OK, \"\"),\n", - " parameters=self.get_parameters(fit_params.config),\n", - " num_examples=num_examples,\n", - " metrics={\"loss\": train_loss, \"mse\": train_result},\n", - " )\n", - "\n", - " def evaluate(self, eval_params: EvaluateIns) -> EvaluateRes:\n", - " # Process incoming request to evaluate\n", - " self.set_parameters(eval_params.parameters)\n", - "\n", - " # Evaluate the model\n", - " self.net.to(self.device)\n", - " loss, result, num_examples = test(\n", - " self.task_type,\n", - " self.net,\n", - " self.valloader,\n", - " device=self.device,\n", - " log_progress=self.log_progress,\n", - " )\n", - "\n", - " # Return evaluation information\n", - " if self.task_type == \"BINARY\":\n", - " print(\n", - " f\"Client {self.cid}: evaluation on {num_examples} examples: loss={loss:.4f}, accuracy={result:.4f}\"\n", - " )\n", - " return EvaluateRes(\n", - " status=Status(Code.OK, \"\"),\n", - " loss=loss,\n", - " num_examples=num_examples,\n", - " metrics={\"accuracy\": result},\n", - " )\n", - " elif self.task_type == \"REG\":\n", - " print(\n", - " f\"Client {self.cid}: evaluation on {num_examples} examples: loss={loss:.4f}, mse={result:.4f}\"\n", - " )\n", - " return EvaluateRes(\n", - " status=Status(Code.OK, \"\"),\n", - " loss=loss,\n", - " num_examples=num_examples,\n", - " metrics={\"mse\": result},\n", - " )" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create Flower custom server\n", - "## Import Flower custom server relevant modules" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Flower server\n", - "import functools\n", - "from flwr.server.strategy import FedXgbNnAvg\n", - "from flwr.server.app import ServerConfig\n", - "\n", - "import timeit\n", - "from logging import DEBUG, INFO\n", - "from typing import Dict, List, Optional, Tuple, Union\n", - "\n", - "from flwr.common import DisconnectRes, Parameters, ReconnectIns, Scalar\n", - "from flwr.common.logger import log\n", - "from flwr.common.typing import GetParametersIns\n", - "from flwr.server.client_manager import ClientManager, SimpleClientManager\n", - "from flwr.server.client_proxy import ClientProxy\n", - "from flwr.server.history import History\n", - "from flwr.server.strategy import Strategy\n", - "from flwr.server.server import (\n", - " reconnect_clients,\n", - " reconnect_client,\n", - " fit_clients,\n", - " fit_client,\n", - " _handle_finished_future_after_fit,\n", - " evaluate_clients,\n", - " evaluate_client,\n", - " _handle_finished_future_after_evaluate,\n", - ")\n", - "\n", - "FitResultsAndFailures = Tuple[\n", - " List[Tuple[ClientProxy, FitRes]],\n", - " List[Union[Tuple[ClientProxy, FitRes], BaseException]],\n", - "]\n", - "EvaluateResultsAndFailures = Tuple[\n", - " List[Tuple[ClientProxy, EvaluateRes]],\n", - " List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],\n", - "]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "class FL_Server(fl.server.Server):\n", - " \"\"\"Flower server.\"\"\"\n", - "\n", - " def __init__(\n", - " self, *, client_manager: ClientManager, strategy: Optional[Strategy] = None\n", - " ) -> None:\n", - " self._client_manager: ClientManager = client_manager\n", - " self.parameters: Parameters = Parameters(\n", - " tensors=[], tensor_type=\"numpy.ndarray\"\n", - " )\n", - " self.strategy: Strategy = strategy\n", - " self.max_workers: Optional[int] = None\n", - "\n", - " # pylint: disable=too-many-locals\n", - " def fit(self, num_rounds: int, timeout: Optional[float]) -> History:\n", - " \"\"\"Run federated averaging for a number of rounds.\"\"\"\n", - " history = History()\n", - "\n", - " # Initialize parameters\n", - " log(INFO, \"Initializing global parameters\")\n", - " self.parameters = self._get_initial_parameters(timeout=timeout)\n", - "\n", - " log(INFO, \"Evaluating initial parameters\")\n", - " res = self.strategy.evaluate(0, parameters=self.parameters)\n", - " if res is not None:\n", - " log(\n", - " INFO,\n", - " \"initial parameters (loss, other metrics): %s, %s\",\n", - " res[0],\n", - " res[1],\n", - " )\n", - " history.add_loss_centralized(server_round=0, loss=res[0])\n", - " history.add_metrics_centralized(server_round=0, metrics=res[1])\n", - "\n", - " # Run federated learning for num_rounds\n", - " log(INFO, \"FL starting\")\n", - " start_time = timeit.default_timer()\n", - "\n", - " for current_round in range(1, num_rounds + 1):\n", - " # Train model and replace previous global model\n", - " res_fit = self.fit_round(server_round=current_round, timeout=timeout)\n", - " if res_fit:\n", - " parameters_prime, _, _ = res_fit # fit_metrics_aggregated\n", - " if parameters_prime:\n", - " self.parameters = parameters_prime\n", - "\n", - " # Evaluate model using strategy implementation\n", - " res_cen = self.strategy.evaluate(current_round, parameters=self.parameters)\n", - " if res_cen is not None:\n", - " loss_cen, metrics_cen = res_cen\n", - " log(\n", - " INFO,\n", - " \"fit progress: (%s, %s, %s, %s)\",\n", - " current_round,\n", - " loss_cen,\n", - " metrics_cen,\n", - " timeit.default_timer() - start_time,\n", - " )\n", - " history.add_loss_centralized(server_round=current_round, loss=loss_cen)\n", - " history.add_metrics_centralized(\n", - " server_round=current_round, metrics=metrics_cen\n", - " )\n", - "\n", - " # Evaluate model on a sample of available clients\n", - " res_fed = self.evaluate_round(server_round=current_round, timeout=timeout)\n", - " if res_fed:\n", - " loss_fed, evaluate_metrics_fed, _ = res_fed\n", - " if loss_fed:\n", - " history.add_loss_distributed(\n", - " server_round=current_round, loss=loss_fed\n", - " )\n", - " history.add_metrics_distributed(\n", - " server_round=current_round, metrics=evaluate_metrics_fed\n", - " )\n", - "\n", - " # Bookkeeping\n", - " end_time = timeit.default_timer()\n", - " elapsed = end_time - start_time\n", - " log(INFO, \"FL finished in %s\", elapsed)\n", - " return history\n", - "\n", - " def evaluate_round(\n", - " self,\n", - " server_round: int,\n", - " timeout: Optional[float],\n", - " ) -> Optional[\n", - " Tuple[Optional[float], Dict[str, Scalar], EvaluateResultsAndFailures]\n", - " ]:\n", - " \"\"\"Validate current global model on a number of clients.\"\"\"\n", - "\n", - " # Get clients and their respective instructions from strategy\n", - " client_instructions = self.strategy.configure_evaluate(\n", - " server_round=server_round,\n", - " parameters=self.parameters,\n", - " client_manager=self._client_manager,\n", - " )\n", - " if not client_instructions:\n", - " log(INFO, \"evaluate_round %s: no clients selected, cancel\", server_round)\n", - " return None\n", - " log(\n", - " DEBUG,\n", - " \"evaluate_round %s: strategy sampled %s clients (out of %s)\",\n", - " server_round,\n", - " len(client_instructions),\n", - " self._client_manager.num_available(),\n", - " )\n", - "\n", - " # Collect `evaluate` results from all clients participating in this round\n", - " results, failures = evaluate_clients(\n", - " client_instructions,\n", - " max_workers=self.max_workers,\n", - " timeout=timeout,\n", - " )\n", - " log(\n", - " DEBUG,\n", - " \"evaluate_round %s received %s results and %s failures\",\n", - " server_round,\n", - " len(results),\n", - " len(failures),\n", - " )\n", - "\n", - " # Aggregate the evaluation results\n", - " aggregated_result: Tuple[\n", - " Optional[float],\n", - " Dict[str, Scalar],\n", - " ] = self.strategy.aggregate_evaluate(server_round, results, failures)\n", - "\n", - " loss_aggregated, metrics_aggregated = aggregated_result\n", - " return loss_aggregated, metrics_aggregated, (results, failures)\n", - "\n", - " def fit_round(\n", - " self,\n", - " server_round: int,\n", - " timeout: Optional[float],\n", - " ) -> Optional[\n", - " Tuple[\n", - " Optional[\n", - " Tuple[\n", - " Parameters,\n", - " Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[\n", - " Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]\n", - " ],\n", - " ],\n", - " ]\n", - " ],\n", - " Dict[str, Scalar],\n", - " FitResultsAndFailures,\n", - " ]\n", - " ]:\n", - " \"\"\"Perform a single round of federated averaging.\"\"\"\n", - "\n", - " # Get clients and their respective instructions from strategy\n", - " client_instructions = self.strategy.configure_fit(\n", - " server_round=server_round,\n", - " parameters=self.parameters,\n", - " client_manager=self._client_manager,\n", - " )\n", - "\n", - " if not client_instructions:\n", - " log(INFO, \"fit_round %s: no clients selected, cancel\", server_round)\n", - " return None\n", - " log(\n", - " DEBUG,\n", - " \"fit_round %s: strategy sampled %s clients (out of %s)\",\n", - " server_round,\n", - " len(client_instructions),\n", - " self._client_manager.num_available(),\n", - " )\n", - "\n", - " # Collect `fit` results from all clients participating in this round\n", - " results, failures = fit_clients(\n", - " client_instructions=client_instructions,\n", - " max_workers=self.max_workers,\n", - " timeout=timeout,\n", - " )\n", - "\n", - " log(\n", - " DEBUG,\n", - " \"fit_round %s received %s results and %s failures\",\n", - " server_round,\n", - " len(results),\n", - " len(failures),\n", - " )\n", - "\n", - " # Aggregate training results\n", - " NN_aggregated: Parameters\n", - " trees_aggregated: Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ]\n", - " metrics_aggregated: Dict[str, Scalar]\n", - " aggregated, metrics_aggregated = self.strategy.aggregate_fit(\n", - " server_round, results, failures\n", - " )\n", - " NN_aggregated, trees_aggregated = aggregated[0], aggregated[1]\n", - "\n", - " if type(trees_aggregated) is list:\n", - " print(\"Server side aggregated\", len(trees_aggregated), \"trees.\")\n", - " else:\n", - " print(\"Server side did not aggregate trees.\")\n", - "\n", - " return (\n", - " [NN_aggregated, trees_aggregated],\n", - " metrics_aggregated,\n", - " (results, failures),\n", - " )\n", - "\n", - " def _get_initial_parameters(\n", - " self, timeout: Optional[float]\n", - " ) -> Tuple[Parameters, Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]]:\n", - " \"\"\"Get initial parameters from one of the available clients.\"\"\"\n", - "\n", - " # Server-side parameter initialization\n", - " parameters: Optional[Parameters] = self.strategy.initialize_parameters(\n", - " client_manager=self._client_manager\n", - " )\n", - " if parameters is not None:\n", - " log(INFO, \"Using initial parameters provided by strategy\")\n", - " return parameters\n", - "\n", - " # Get initial parameters from one of the clients\n", - " log(INFO, \"Requesting initial parameters from one random client\")\n", - " random_client = self._client_manager.sample(1)[0]\n", - " ins = GetParametersIns(config={})\n", - " get_parameters_res_tree = random_client.get_parameters(ins=ins, timeout=timeout)\n", - " parameters = [get_parameters_res_tree[0].parameters, get_parameters_res_tree[1]]\n", - " log(INFO, \"Received initial parameters from one random client\")\n", - "\n", - " return parameters" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create server-side evaluation and experiment" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "executionInfo": { - "elapsed": 35, - "status": "ok", - "timestamp": 1670363021676, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - } - }, - "outputs": [], - "source": [ - "def print_model_layers(model: nn.Module) -> None:\n", - " print(model)\n", - " for param_tensor in model.state_dict():\n", - " print(param_tensor, \"\\t\", model.state_dict()[param_tensor].size())\n", - "\n", - "\n", - "def serverside_eval(\n", - " server_round: int,\n", - " parameters: Tuple[\n", - " Parameters,\n", - " Union[\n", - " Tuple[XGBClassifier, int],\n", - " Tuple[XGBRegressor, int],\n", - " List[Union[Tuple[XGBClassifier, int], Tuple[XGBRegressor, int]]],\n", - " ],\n", - " ],\n", - " config: Dict[str, Scalar],\n", - " task_type: str,\n", - " testloader: DataLoader,\n", - " batch_size: int,\n", - " client_tree_num: int,\n", - " client_num: int,\n", - ") -> Tuple[float, Dict[str, float]]:\n", - " \"\"\"An evaluation function for centralized/serverside evaluation over the entire test set.\"\"\"\n", - " # device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n", - " device = \"cpu\"\n", - " model = CNN()\n", - " # print_model_layers(model)\n", - "\n", - " model.set_weights(parameters_to_ndarrays(parameters[0]))\n", - " model.to(device)\n", - "\n", - " trees_aggregated = parameters[1]\n", - " testloader = tree_encoding_loader(\n", - " testloader, batch_size, trees_aggregated, client_tree_num, client_num\n", - " )\n", - " loss, result, _ = test(\n", - " task_type, model, testloader, device=device, log_progress=False\n", - " )\n", - "\n", - " if task_type == \"BINARY\":\n", - " print(\n", - " f\"Evaluation on the server: test_loss={loss:.4f}, test_accuracy={result:.4f}\"\n", - " )\n", - " return loss, {\"accuracy\": result}\n", - " elif task_type == \"REG\":\n", - " print(f\"Evaluation on the server: test_loss={loss:.4f}, test_mse={result:.4f}\")\n", - " return loss, {\"mse\": result}\n", - "\n", - "\n", - "def start_experiment(\n", - " task_type: str,\n", - " trainset: Dataset,\n", - " testset: Dataset,\n", - " num_rounds: int = 5,\n", - " client_tree_num: int = 50,\n", - " client_pool_size: int = 5,\n", - " num_iterations: int = 100,\n", - " fraction_fit: float = 1.0,\n", - " min_fit_clients: int = 2,\n", - " batch_size: int = 32,\n", - " val_ratio: float = 0.1,\n", - ") -> History:\n", - " client_resources = {\"num_cpus\": 0.5} # 2 clients per CPU\n", - "\n", - " # Partition the dataset into subsets reserved for each client.\n", - " # - 'val_ratio' controls the proportion of the (local) client reserved as a local test set\n", - " # (good for testing how the final model performs on the client's local unseen data)\n", - " trainloaders, valloaders, testloader = do_fl_partitioning(\n", - " trainset,\n", - " testset,\n", - " batch_size=\"whole\",\n", - " pool_size=client_pool_size,\n", - " val_ratio=val_ratio,\n", - " )\n", - " print(\n", - " f\"Data partitioned across {client_pool_size} clients\"\n", - " f\" and {val_ratio} of local dataset reserved for validation.\"\n", - " )\n", - "\n", - " # Configure the strategy\n", - " def fit_config(server_round: int) -> Dict[str, Scalar]:\n", - " print(f\"Configuring round {server_round}\")\n", - " return {\n", - " \"num_iterations\": num_iterations,\n", - " \"batch_size\": batch_size,\n", - " }\n", - "\n", - " # FedXgbNnAvg\n", - " strategy = FedXgbNnAvg(\n", - " fraction_fit=fraction_fit,\n", - " fraction_evaluate=fraction_fit if val_ratio > 0.0 else 0.0,\n", - " min_fit_clients=min_fit_clients,\n", - " min_evaluate_clients=min_fit_clients,\n", - " min_available_clients=client_pool_size, # all clients should be available\n", - " on_fit_config_fn=fit_config,\n", - " on_evaluate_config_fn=(lambda r: {\"batch_size\": batch_size}),\n", - " evaluate_fn=functools.partial(\n", - " serverside_eval,\n", - " task_type=task_type,\n", - " testloader=testloader,\n", - " batch_size=batch_size,\n", - " client_tree_num=client_tree_num,\n", - " client_num=client_num,\n", - " ),\n", - " accept_failures=False,\n", - " )\n", - "\n", - " print(\n", - " f\"FL experiment configured for {num_rounds} rounds with {client_pool_size} client in the pool.\"\n", - " )\n", - " print(\n", - " f\"FL round will proceed with {fraction_fit * 100}% of clients sampled, at least {min_fit_clients}.\"\n", - " )\n", - "\n", - " def client_fn(cid: str) -> fl.client.Client:\n", - " \"\"\"Creates a federated learning client\"\"\"\n", - " if val_ratio > 0.0 and val_ratio <= 1.0:\n", - " return FL_Client(\n", - " task_type,\n", - " trainloaders[int(cid)],\n", - " valloaders[int(cid)],\n", - " client_tree_num,\n", - " client_pool_size,\n", - " cid,\n", - " log_progress=False,\n", - " )\n", - " else:\n", - " return FL_Client(\n", - " task_type,\n", - " trainloaders[int(cid)],\n", - " None,\n", - " client_tree_num,\n", - " client_pool_size,\n", - " cid,\n", - " log_progress=False,\n", - " )\n", - "\n", - " # Start the simulation\n", - " history = fl.simulation.start_simulation(\n", - " client_fn=client_fn,\n", - " server=FL_Server(client_manager=SimpleClientManager(), strategy=strategy),\n", - " num_clients=client_pool_size,\n", - " client_resources=client_resources,\n", - " config=ServerConfig(num_rounds=num_rounds),\n", - " strategy=strategy,\n", - " )\n", - "\n", - " print(history)\n", - "\n", - " return history" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Start federated training and inference\n", - "#### High-level workflow: \n", - "#### At round 1, each client first builds their own local XGBoost tree, and sends to the server. The server aggregates all trees and sends to all clients. \n", - "#### After round 1, each client calculates every other client tree’s prediction results, and trains a convolutional neural network with 1D convolution kernel size == the number of XGBoost trees in the tree ensemble. \n", - "#### The sharing of privacy-sensitive information is not needed, and the learning rate (a hyperparameter for XGBoost) is learnable using 1D convolution." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/", - "height": 624 - }, - "executionInfo": { - "elapsed": 7610, - "status": "error", - "timestamp": 1670363029252, - "user": { - "displayName": "Chenyang Ma", - "userId": "17975430055716133031" - }, - "user_tz": 0 - }, - "outputId": "ee2b7146-07ec-4f97-ba44-5b12b35bbeaf" - }, - "outputs": [], - "source": [ - "start_experiment(\n", - " task_type=task_type,\n", - " trainset=trainset,\n", - " testset=testset,\n", - " num_rounds=20,\n", - " client_tree_num=client_tree_num,\n", - " client_pool_size=client_num,\n", - " num_iterations=100,\n", - " batch_size=64,\n", - " fraction_fit=1.0,\n", - " min_fit_clients=1,\n", - " val_ratio=0.0,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "colab": { - "provenance": [] - }, - "gpuClass": "premium", - "kernelspec": { - "display_name": "FedXGBoost", - "language": "python", - "name": "python3" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/src/py/flwr/common/logger.py b/src/py/flwr/common/logger.py index 29d1562a86d3..50c902da38b5 100644 --- a/src/py/flwr/common/logger.py +++ b/src/py/flwr/common/logger.py @@ -111,3 +111,17 @@ def warn_experimental_feature(name: str) -> None: """, name, ) + + +def warn_deprecated_feature(name: str) -> None: + """Warn the user when they use a deprecated feature.""" + log( + WARN, + """ + DEPRECATED FEATURE: %s + + This is a deprecated feature. It will be removed + entirely in future versions of Flower. + """, + name, + ) diff --git a/src/py/flwr/server/strategy/fedxgb_nn_avg.py b/src/py/flwr/server/strategy/fedxgb_nn_avg.py index f300633d0d9f..8dedc925f350 100644 --- a/src/py/flwr/server/strategy/fedxgb_nn_avg.py +++ b/src/py/flwr/server/strategy/fedxgb_nn_avg.py @@ -25,7 +25,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union from flwr.common import FitRes, Scalar, ndarrays_to_parameters, parameters_to_ndarrays -from flwr.common.logger import log +from flwr.common.logger import log, warn_deprecated_feature from flwr.server.client_proxy import ClientProxy from .aggregate import aggregate @@ -33,7 +33,13 @@ class FedXgbNnAvg(FedAvg): - """Configurable FedXgbNnAvg strategy implementation.""" + """Configurable FedXgbNnAvg strategy implementation. + + Warning + ------- + This strategy is deprecated, but a copy of it is available in Flower Baselines: + https://github.com/adap/flower/tree/main/baselines/hfedxgboost. + """ def __init__(self, *args: Any, **kwargs: Any) -> None: """Federated XGBoost [Ma et al., 2023] strategy. @@ -41,6 +47,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: Implementation based on https://arxiv.org/abs/2304.07537. """ super().__init__(*args, **kwargs) + warn_deprecated_feature("`FedXgbNnAvg` strategy") def __repr__(self) -> str: """Compute a string representation of the strategy."""