diff --git a/src/py/flwr/common/differential_privacy.py b/src/py/flwr/common/differential_privacy.py new file mode 100644 index 000000000000..c6ec45d30f8e --- /dev/null +++ b/src/py/flwr/common/differential_privacy.py @@ -0,0 +1,87 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Utility functions for differential privacy.""" + + +import numpy as np + +from flwr.common import ( + NDArrays, + Parameters, + ndarrays_to_parameters, + parameters_to_ndarrays, +) + + +def get_norm(input_arrays: NDArrays) -> float: + """Compute the L2 norm of the flattened input.""" + array_norms = [np.linalg.norm(array.flat) for array in input_arrays] + # pylint: disable=consider-using-generator + return float(np.sqrt(sum([norm**2 for norm in array_norms]))) + + +def add_gaussian_noise_inplace(input_arrays: NDArrays, std_dev: float) -> None: + """Add Gaussian noise to each element of the input arrays.""" + for array in input_arrays: + array += np.random.normal(0, std_dev, array.shape) + + +def clip_inputs_inplace(input_arrays: NDArrays, clipping_norm: float) -> None: + """Clip model update based on the clipping norm in-place. + + FlatClip method of the paper: https://arxiv.org/abs/1710.06963 + """ + input_norm = get_norm(input_arrays) + scaling_factor = min(1, clipping_norm / input_norm) + for array in input_arrays: + array *= scaling_factor + + +def compute_stdv( + noise_multiplier: float, clipping_norm: float, num_sampled_clients: int +) -> float: + """Compute standard deviation for noise addition. + + Paper: https://arxiv.org/abs/1710.06963 + """ + return float((noise_multiplier * clipping_norm) / num_sampled_clients) + + +def compute_clip_model_update( + param1: NDArrays, param2: NDArrays, clipping_norm: float +) -> None: + """Compute model update (param1 - param2) and clip it. + + Then add the clipped value to param1.""" + model_update = [np.subtract(x, y) for (x, y) in zip(param1, param2)] + clip_inputs_inplace(model_update, clipping_norm) + + for i, _ in enumerate(param2): + param1[i] = param2[i] + model_update[i] + + +def add_gaussian_noise_to_params( + model_params: Parameters, + noise_multiplier: float, + clipping_norm: float, + num_sampled_clients: int, +) -> Parameters: + """Add gaussian noise to model parameters.""" + model_params_ndarrays = parameters_to_ndarrays(model_params) + add_gaussian_noise_inplace( + model_params_ndarrays, + compute_stdv(noise_multiplier, clipping_norm, num_sampled_clients), + ) + return ndarrays_to_parameters(model_params_ndarrays) diff --git a/src/py/flwr/common/differential_privacy_constants.py b/src/py/flwr/common/differential_privacy_constants.py new file mode 100644 index 000000000000..9ec080975aa3 --- /dev/null +++ b/src/py/flwr/common/differential_privacy_constants.py @@ -0,0 +1,22 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Constants for differential privacy.""" + +CLIENTS_DISCREPANCY_WARNING = ( + "The number of clients returning parameters (%s)" + " differs from the number of sampled clients (%s)." + " This could impact the differential privacy guarantees," + " potentially leading to privacy leakage or inadequate noise calibration." +) diff --git a/src/py/flwr/common/differential_privacy_test.py b/src/py/flwr/common/differential_privacy_test.py new file mode 100644 index 000000000000..32b7bc9e4b36 --- /dev/null +++ b/src/py/flwr/common/differential_privacy_test.py @@ -0,0 +1,138 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Differential Privacy (DP) utility functions tests.""" + + +import numpy as np + +from .differential_privacy import ( + add_gaussian_noise_inplace, + clip_inputs_inplace, + compute_clip_model_update, + compute_stdv, + get_norm, +) + + +def test_add_gaussian_noise_inplace() -> None: + """Test add_gaussian_noise_inplace function.""" + # Prepare + update = [np.array([[1.0, 2.0], [3.0, 4.0]]), np.array([[5.0, 6.0], [7.0, 8.0]])] + std_dev = 0.1 + + # Execute + add_gaussian_noise_inplace(update, std_dev) + + # Assert + # Check that the shape of the result is the same as the input + for layer in update: + assert layer.shape == (2, 2) + + # Check that the values have been changed and are not equal to the original update + for layer in update: + assert not np.array_equal( + layer, [[1.0, 2.0], [3.0, 4.0]] + ) and not np.array_equal(layer, [[5.0, 6.0], [7.0, 8.0]]) + + # Check that the noise has been added + for layer in update: + noise_added = ( + layer - np.array([[1.0, 2.0], [3.0, 4.0]]) + if np.array_equal(layer, [[1.0, 2.0], [3.0, 4.0]]) + else layer - np.array([[5.0, 6.0], [7.0, 8.0]]) + ) + assert np.any(np.abs(noise_added) > 0) + + +def test_get_norm() -> None: + """Test get_norm function.""" + # Prepare + update = [np.array([[1, 2], [3, 4]]), np.array([[5, 6], [7, 8]])] + + # Execute + result = get_norm(update) + + expected = float( + np.linalg.norm(np.concatenate([sub_update.flatten() for sub_update in update])) + ) + + # Assert + assert expected == result + + +def test_clip_inputs_inplace() -> None: + """Test clip_inputs_inplace function.""" + # Prepare + updates = [ + np.array([[1.5, -0.5], [2.0, -1.0]]), + np.array([0.5, -0.5]), + np.array([[-0.5, 1.5], [-1.0, 2.0]]), + np.array([-0.5, 0.5]), + ] + clipping_norm = 1.5 + + original_updates = [np.copy(update) for update in updates] + + # Execute + clip_inputs_inplace(updates, clipping_norm) + + # Assert + for updated, original_update in zip(updates, original_updates): + clip_norm = np.linalg.norm(original_update) + assert np.all(updated <= clip_norm) and np.all(updated >= -clip_norm) + + +def test_compute_stdv() -> None: + """Test compute_stdv function.""" + # Prepare + noise_multiplier = 1.0 + clipping_norm = 0.5 + num_sampled_clients = 10 + + # Execute + stdv = compute_stdv(noise_multiplier, clipping_norm, num_sampled_clients) + + # Assert + expected_stdv = float((noise_multiplier * clipping_norm) / num_sampled_clients) + assert stdv == expected_stdv + + +def test_compute_clip_model_update() -> None: + """Test compute_clip_model_update function.""" + # Prepare + param1 = [ + np.array([0.5, 1.5, 2.5]), + np.array([3.5, 4.5, 5.5]), + np.array([6.5, 7.5, 8.5]), + ] + param2 = [ + np.array([1.0, 2.0, 3.0]), + np.array([4.0, 5.0, 6.0]), + np.array([7.0, 8.0, 9.0]), + ] + clipping_norm = 4 + + expected_result = [ + np.array([0.5, 1.5, 2.5]), + np.array([3.5, 4.5, 5.5]), + np.array([6.5, 7.5, 8.5]), + ] + + # Execute + compute_clip_model_update(param1, param2, clipping_norm) + + # Verify + for i, param in enumerate(param1): + np.testing.assert_array_almost_equal(param, expected_result[i]) diff --git a/src/py/flwr/server/strategy/__init__.py b/src/py/flwr/server/strategy/__init__.py index 1750a7522379..a31f5d48b77c 100644 --- a/src/py/flwr/server/strategy/__init__.py +++ b/src/py/flwr/server/strategy/__init__.py @@ -16,6 +16,7 @@ from .bulyan import Bulyan as Bulyan +from .dp_fixed_clipping import DifferentialPrivacyServerSideFixedClipping from .dpfedavg_adaptive import DPFedAvgAdaptive as DPFedAvgAdaptive from .dpfedavg_fixed import DPFedAvgFixed as DPFedAvgFixed from .fault_tolerant_fedavg import FaultTolerantFedAvg as FaultTolerantFedAvg @@ -57,4 +58,5 @@ "DPFedAvgAdaptive", "DPFedAvgFixed", "Strategy", + "DifferentialPrivacyServerSideFixedClipping", ] diff --git a/src/py/flwr/server/strategy/dp_fixed_clipping.py b/src/py/flwr/server/strategy/dp_fixed_clipping.py new file mode 100644 index 000000000000..d18a6b2079d9 --- /dev/null +++ b/src/py/flwr/server/strategy/dp_fixed_clipping.py @@ -0,0 +1,187 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Central differential privacy with fixed clipping. + +Papers: https://arxiv.org/abs/1712.07557, https://arxiv.org/abs/1710.06963 +""" + + +from logging import WARNING +from typing import Dict, List, Optional, Tuple, Union + +from flwr.common import ( + EvaluateIns, + EvaluateRes, + FitIns, + FitRes, + NDArrays, + Parameters, + Scalar, + ndarrays_to_parameters, + parameters_to_ndarrays, +) +from flwr.common.differential_privacy import ( + add_gaussian_noise_to_params, + compute_clip_model_update, +) +from flwr.common.differential_privacy_constants import CLIENTS_DISCREPANCY_WARNING +from flwr.common.logger import log +from flwr.server.client_manager import ClientManager +from flwr.server.client_proxy import ClientProxy +from flwr.server.strategy.strategy import Strategy + + +class DifferentialPrivacyServerSideFixedClipping(Strategy): + """Wrapper for Central DP with Server Side Fixed Clipping. + + Parameters + ---------- + strategy : Strategy + The strategy to which DP functionalities will be added by this wrapper. + noise_multiplier : float + The noise multiplier for the Gaussian mechanism for model updates. + A value of 1.0 or higher is recommended for strong privacy. + clipping_norm : float + The value of the clipping norm. + num_sampled_clients : int + The number of clients that are sampled on each round. + + Examples + -------- + Create a strategy: + + >>> strategy = fl.server.strategy.FedAvg( ... ) + + Wrap the strategy with the DifferentialPrivacyServerSideFixedClipping wrapper + + >>> dp_strategy = DifferentialPrivacyServerSideFixedClipping( + >>> strategy, cfg.noise_multiplier, cfg.clipping_norm, cfg.num_sampled_clients + >>> ) + """ + + # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__( + self, + strategy: Strategy, + noise_multiplier: float, + clipping_norm: float, + num_sampled_clients: int, + ) -> None: + super().__init__() + + self.strategy = strategy + + if noise_multiplier < 0: + raise ValueError("The noise multiplier should be a non-negative value.") + + if clipping_norm <= 0: + raise ValueError("The clipping norm should be a positive value.") + + if num_sampled_clients <= 0: + raise ValueError( + "The number of sampled clients should be a positive value." + ) + + self.noise_multiplier = noise_multiplier + self.clipping_norm = clipping_norm + self.num_sampled_clients = num_sampled_clients + + self.current_round_params: NDArrays = [] + + def __repr__(self) -> str: + """Compute a string representation of the strategy.""" + rep = "Differential Privacy Strategy Wrapper (Server-Side Fixed Clipping)" + return rep + + def initialize_parameters( + self, client_manager: ClientManager + ) -> Optional[Parameters]: + """Initialize global model parameters using given strategy.""" + return self.strategy.initialize_parameters(client_manager) + + def configure_fit( + self, server_round: int, parameters: Parameters, client_manager: ClientManager + ) -> List[Tuple[ClientProxy, FitIns]]: + """Configure the next round of training.""" + self.current_round_params = parameters_to_ndarrays(parameters) + return self.strategy.configure_fit(server_round, parameters, client_manager) + + def configure_evaluate( + self, server_round: int, parameters: Parameters, client_manager: ClientManager + ) -> List[Tuple[ClientProxy, EvaluateIns]]: + """Configure the next round of evaluation.""" + return self.strategy.configure_evaluate( + server_round, parameters, client_manager + ) + + def aggregate_fit( + self, + server_round: int, + results: List[Tuple[ClientProxy, FitRes]], + failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]], + ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]: + """Compute the updates, clip, and pass them for aggregation. + + Afterward, add noise to the aggregated parameters. + """ + if failures: + return None, {} + + if len(results) != self.num_sampled_clients: + log( + WARNING, + CLIENTS_DISCREPANCY_WARNING, + len(results), + self.num_sampled_clients, + ) + for _, res in results: + param = parameters_to_ndarrays(res.parameters) + # Compute and clip update + compute_clip_model_update( + param, self.current_round_params, self.clipping_norm + ) + # Convert back to parameters + res.parameters = ndarrays_to_parameters(param) + + # Pass the new parameters for aggregation + aggregated_params, metrics = self.strategy.aggregate_fit( + server_round, results, failures + ) + + # Add Gaussian noise to the aggregated parameters + if aggregated_params: + aggregated_params = add_gaussian_noise_to_params( + aggregated_params, + self.noise_multiplier, + self.clipping_norm, + self.num_sampled_clients, + ) + + return aggregated_params, metrics + + def aggregate_evaluate( + self, + server_round: int, + results: List[Tuple[ClientProxy, EvaluateRes]], + failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]], + ) -> Tuple[Optional[float], Dict[str, Scalar]]: + """Aggregate evaluation losses using the given strategy.""" + return self.strategy.aggregate_evaluate(server_round, results, failures) + + def evaluate( + self, server_round: int, parameters: Parameters + ) -> Optional[Tuple[float, Dict[str, Scalar]]]: + """Evaluate model parameters using an evaluation function from the strategy.""" + return self.strategy.evaluate(server_round, parameters)