Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optuna #208

Merged
merged 9 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions examples/gia/huang_optuna/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

from cifar import get_cifar10_loader

from leakpro.fl_utils.gia_train import train
from leakpro.run import huang_optuna
from leakpro.attacks.gia_attacks.huang import Huang
from model import ResNet, PreActBlock

if __name__ == "__main__":
# Instantiate the base model and get the CIFAR-10 loader

# Pre activation required for the attack to give decent results
# Pre activation required for this attack to give decent results
base_model = ResNet(PreActBlock, [2, 2, 2, 2], num_classes=10)
cifar10_loader, mean, std = get_cifar10_loader(num_images=16, batch_size=16, num_workers=2)
client_loader, mean, std = get_cifar10_loader(num_images=16, batch_size=16, num_workers=2)

# Run Optuna optimization with Huang
huang_optuna(base_model, cifar10_loader, train, mean, std)
attack_object = Huang(base_model, client_loader, mean, std)
attack_object.run_with_optuna()
2 changes: 1 addition & 1 deletion examples/gia/inverting_celebA_1_image/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

configs = InvertingConfig()
configs.at_iterations = 24000
configs.total_variation = 1.0e-06
configs.tv_reg = 1.0e-06
configs.attack_lr = 0.1
configs.criterion = BCEWithLogitsLoss() #CrossEntropyLoss()
configs.epochs = 1
Expand Down
30 changes: 30 additions & 0 deletions leakpro/attacks/attack_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Run optuna to find best hyperparameters."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Self

import optuna


class AbstractAttack(ABC):
"""Abstract attack template for attack objects."""

@abstractmethod
def run_with_optuna(self:Self, optuna_config: Optional[dataclass]) -> optuna.study.Study:
"""Fins optimal hyperparameters using optuna."""
pass

@abstractmethod
def run_attack() -> None:
"""Run the attack on the target model and dataset.

This method is implemented by subclasses (e.g., GIA and MIA attacks),
each of which provides specific behavior and results.

Returns
-------
Depends on the subclass implementation.

"""
pass

99 changes: 83 additions & 16 deletions leakpro/attacks/gia_attacks/abstract_gia.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
"""Module that contains the abstract class for constructing and performing a membership inference attack on a target."""

from abc import ABC, abstractmethod

from abc import abstractmethod
from collections.abc import Generator
from copy import deepcopy
from typing import Callable, Optional

import optuna
import torch
from torch import Tensor
from torch.utils.data import DataLoader

from leakpro.attacks.attack_base import AbstractAttack
from leakpro.fl_utils.model_utils import MedianPool2d
from leakpro.fl_utils.similarity_measurements import dataloaders_psnr, dataloaders_ssim_ignite
from leakpro.hyperparameter_tuning.optuna import OptunaConfig, optuna_optimal_hyperparameters
from leakpro.metrics.attack_result import GIAResults
from leakpro.utils.import_helper import Self

########################################################################################################################
# METRIC CLASS
########################################################################################################################
from leakpro.utils.logger import logger


class AbstractGIA(ABC):
class AbstractGIA(AbstractAttack):
"""Interface to construct and perform a gradient inversion attack on a target model and dataset.

This serves as a guideline for implementing a metric to be used for measuring the privacy leakage of a target model.
Expand Down Expand Up @@ -57,17 +66,75 @@ def prepare_attack(self:Self) -> None:
pass

@abstractmethod
def run_attack(self:Self) -> GIAResults:
"""Run the metric on the target model and dataset. This method handles all the computations related to the audit dataset.

Args:
----
fpr_tolerance_rate_list (optional): List of FPR tolerance values that may be used by the threshold function
to compute the attack threshold for the metric.
def run_attack(self:Self) -> Generator[tuple[int, Tensor, GIAResults]]:
"""Runs GIA attack.

Returns:
Returns
-------
Result(s) of the metric.
Generator with intermediary results to allow for lazy evaluation during
hyperparameter turning, and final GiaResults.

"""
pass

@abstractmethod
def reset_attack(self: Self) -> None:
"""Reset attack to its initial state."""
pass

@abstractmethod
def suggest_parameters(self: Self, trial: optuna.trial.Trial) -> None:
"""Apply and suggest new hyperparameters for the attack using optuna trial."""
pass

@abstractmethod
def get_configs(self:Self) -> dict:
"""Get the configs used for the attack."""
pass

def run_with_optuna(self:Self, optuna_config: Optional[OptunaConfig] = None) -> optuna.study.Study:
"""Fins optimal hyperparameters using optuna."""
if optuna_config is None:
optuna_config = OptunaConfig()
optuna_optimal_hyperparameters(self, optuna_config)

def generic_attack_loop(self: Self, configs:dict, gradient_closure: Callable, at_iterations: int,
reconstruction: Tensor, data_mean: Tensor, data_std: Tensor, attack_lr: float,
median_pooling: bool, client_loader: DataLoader, reconstruction_loader: DataLoader
) -> Generator[tuple[int, Tensor, GIAResults]]:
"""Generic attack loop for GIA's."""
optimizer = torch.optim.Adam([reconstruction], lr=attack_lr)
# reduce LR every 1/3 of total iterations
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
milestones=[at_iterations // 2.667,
at_iterations // 1.6,
at_iterations // 1.142], gamma=0.1)
for i in range(at_iterations):
# loss function which does training and compares distance from reconstruction training to the real training.
closure = gradient_closure(optimizer)

loss = optimizer.step(closure)
scheduler.step()
with torch.no_grad():
# force pixels to be in reasonable ranges
reconstruction.data = torch.max(
torch.min(reconstruction, (1 - data_mean) / data_std), -data_mean / data_std
)
if (i +1) % 500 == 0 and median_pooling:
reconstruction.data = MedianPool2d(kernel_size=3, stride=1, padding=1, same=False)(reconstruction)
# Chose image who has given least loss
if loss < self.best_loss:
self.best_loss = loss
self.best_reconstruction = deepcopy(reconstruction_loader)
self.best_reconstruction_round = i
logger.info(f"New best loss: {loss} on round: {i}")
if i % 250 == 0:
logger.info(f"Iteration {i}, loss {loss}")
yield i, dataloaders_ssim_ignite(client_loader, self.best_reconstruction), None

ssim_score = dataloaders_ssim_ignite(client_loader, self.best_reconstruction)
psnr_score = dataloaders_psnr(client_loader, self.best_reconstruction)
gia_result = GIAResults(client_loader, self.best_reconstruction,
psnr_score=psnr_score, ssim_score=ssim_score,
data_mean=data_mean, data_std=data_std, config=configs)
yield i, ssim_score, gia_result
131 changes: 50 additions & 81 deletions leakpro/attacks/gia_attacks/huang.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from collections.abc import Generator
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Union
from typing import Optional

import optuna
import torch
from torch import Tensor
from torch.nn import CrossEntropyLoss, Module
Expand All @@ -12,8 +13,9 @@
from leakpro.attacks.gia_attacks.abstract_gia import AbstractGIA
from leakpro.fl_utils.data_utils import get_at_images
from leakpro.fl_utils.gia_optimizers import MetaSGD
from leakpro.fl_utils.img_utils import MedianPool2d, dataloaders_ssim_ignite, l2_norm, total_variation
from leakpro.fl_utils.gia_train import train
from leakpro.fl_utils.model_utils import BNFeatureHook
from leakpro.fl_utils.similarity_measurements import cosine_similarity_weights, l2_norm, total_variation
from leakpro.metrics.attack_result import GIAResults
from leakpro.utils.import_helper import Callable, Self
from leakpro.utils.logger import logger
Expand All @@ -24,7 +26,7 @@ class HuangConfig:
"""Possible configs for the Inverting Gradients attack."""

# total variation scale for smoothing the reconstructions after each iteration
total_variation: float = 0.052
tv_reg: float = 0.052
# learning rate on the attack optimizer
attack_lr: float = 0.1
# iterations for the attack steps
Expand All @@ -41,34 +43,31 @@ class HuangConfig:
bn_reg: float = 0.00016
# l2 scale for discouraging high overall pixel intensity
l2_scale: float = 0
# if we compare difference only for top 10 layers with largest changes. Potentially good for larger models.
top10norms: bool = False


class Huang(AbstractGIA):
"""Gradient inversion attack by Huang et al."""

def __init__(self: Self, model: Module, client_loader: DataLoader, train_fn: Callable,
data_mean: Tensor, data_std: Tensor, configs: HuangConfig) -> None:
def __init__(self: Self, model: Module, client_loader: DataLoader, data_mean: Tensor, data_std: Tensor,
train_fn: Optional[Callable] = None, configs: Optional[HuangConfig] = None) -> None:
super().__init__()
self.model = model
self.original_model = model
self.model = deepcopy(self.original_model)
self.best_loss = float("inf")
self.best_reconstruction = None
self.best_reconstruction_round = None

self.configs = configs if configs is not None else HuangConfig()

self.client_loader = client_loader
self.train_fn = train_fn
self.train_fn = train_fn if train_fn is not None else train
self.data_mean = data_mean
self.data_std = data_std
self.t_v_scale = configs.total_variation
self.attack_lr = configs.attack_lr
self.iterations = configs.at_iterations
self.optimizer = configs.optimizer
self.criterion = configs.criterion
self.epochs = configs.epochs
self.median_pooling = configs.median_pooling
self.bn_reg = configs.bn_reg
self.l2_scale = configs.l2_scale

self.best_loss = float("inf")
self.best_reconstruction = None
self.best_reconstruction_round = None
logger.info("Evaluating with Huang. et al initialized.")
self.prepare_attack()
logger.info("Evaluating with Huang. et al initialized.")

def description(self:Self) -> dict:
"""Return a description of the attack."""
Expand Down Expand Up @@ -103,7 +102,8 @@ def prepare_attack(self:Self) -> None:
if isinstance(module, torch.nn.BatchNorm2d):
self.loss_r_feature_layers.append(BNFeatureHook(module))
# calculate running bn statistics and get client gradient
client_gradient = self.train_fn(self.model, self.client_loader, self.optimizer, self.criterion, self.epochs)
client_gradient = self.train_fn(self.model, self.client_loader,
self.configs.optimizer, self.configs.criterion, self.configs.epochs)

# Stop updating running statistics
for module in self.model.modules():
Expand All @@ -115,45 +115,17 @@ def prepare_attack(self:Self) -> None:
self.client_gradient = [p.detach() for p in client_gradient]


def run_attack(self:Self) -> Union[GIAResults, Generator[tuple[int, Tensor]]]:
def run_attack(self:Self) -> Generator[tuple[int, Tensor, GIAResults]]:
"""Run the attack and return the combined metric result.

Returns
-------
GIAResults: Container for results on GIA attacks.

"""
optimizer = torch.optim.Adam([self.reconstruction], lr=self.attack_lr)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
milestones=[self.iterations // 2.667, self.iterations // 1.6,

self.iterations // 1.142], gamma=0.1)
for i in range(self.iterations):
# loss function which does training and compares distance from reconstruction training to the real training.
closure = self.gradient_closure(optimizer)

loss = optimizer.step(closure)
scheduler.step()
with torch.no_grad():
# force pixels to be in reasonable ranges
self.reconstruction.data = torch.max(
torch.min(self.reconstruction, (1 - self.data_mean) / self.data_std), -self.data_mean / self.data_std
)
if (i +1) % 500 == 0 and self.median_pooling:
self.reconstruction.data = MedianPool2d(kernel_size=3, stride=1, padding=1, same=False)(self.reconstruction)
if i % 250 == 0:
logger.info(f"Iteration {i}, loss {loss}")
yield i, dataloaders_ssim_ignite(self.client_loader, self.reconstruction_loader)
# Chose image who has given least loss
if loss < self.best_loss:
self.best_loss = loss
self.best_reconstruction = deepcopy(self.reconstruction_loader)
self.best_reconstruction_round = i
logger.info(f"New best loss: {loss} on round: {i}")

return GIAResults(self.client_loader, self.best_reconstruction,
ssim_score=dataloaders_ssim_ignite(self.client_loader, self.reconstruction_loader),
data_mean=self.data_mean, data_std=self.data_std)
return self.generic_attack_loop(self.configs, self.gradient_closure, self.configs.at_iterations, self.reconstruction,
self.data_mean, self.data_std, self.configs.attack_lr, self.configs.median_pooling,
self.client_loader, self.reconstruction_loader)


def gradient_closure(self: Self, optimizer: torch.optim.Optimizer) -> Callable:
Expand All @@ -172,46 +144,43 @@ def closure() -> torch.Tensor:
"""
optimizer.zero_grad()
self.model.zero_grad()
gradient = self.train_fn(self.model, self.reconstruction_loader, self.optimizer, self.criterion, self.epochs)
rec_loss = self.reconstruction_costs(gradient, self.client_gradient)
gradient = self.train_fn(self.model, self.reconstruction_loader, self.configs.optimizer,
self.configs.criterion, self.configs.epochs)
rec_loss = cosine_similarity_weights(gradient, self.client_gradient, self.configs.top10norms)

loss_r_feature = sum([
mod.r_feature
for (idx, mod) in enumerate(self.loss_r_feature_layers)
])

# Add the TV loss term to penalize large variations between pixels, encouraging smoother images.
rec_loss += self.t_v_scale * total_variation(self.reconstruction)
rec_loss += self.bn_reg * loss_r_feature
rec_loss += self.l2_scale * l2_norm(self.reconstruction)
rec_loss += self.configs.tv_reg * total_variation(self.reconstruction)
rec_loss += self.configs.bn_reg * loss_r_feature
rec_loss += self.configs.l2_scale * l2_norm(self.reconstruction)
rec_loss.backward()
self.reconstruction.grad.sign_()
return rec_loss
return closure

def reconstruction_costs(self: Self, client_gradient: torch.Tensor, reconstruction_gradient: torch.Tensor) -> torch.Tensor:
"""Computes the reconstruction costs between client gradients and the reconstruction gradient.

This function calculates the pairwise costs between each client gradient and the reconstruction gradient
using the cosine similarity measure. The costs are accumulated and averaged over all client gradients.
def _configure_attack(self: Self, configs: dict) -> None:
pass

Returns
-------
torch.Tensor: The average reconstruction cost.
def suggest_parameters(self: Self, trial: optuna.trial.Trial) -> None:
"""Suggest parameters to chose and range for optimization for the Huang attack."""
total_variation = trial.suggest_float("total_variation", 1e-6, 1e-1, log=True)
bn_reg = trial.suggest_float("bn_reg", 1e-4, 1e-1, log=True)
self.configs.tv_reg = total_variation
self.configs.bn_reg = bn_reg

"""
with torch.no_grad():
indices = torch.arange(len(reconstruction_gradient))
filtered_trial_gradients = [reconstruction_gradient[i] for i in indices]
filtered_input_gradients = [client_gradient[i] for i in indices]
costs = sum((x * y).sum() for x, y in zip(filtered_input_gradients,
filtered_trial_gradients))

trial_norm = sum(x.pow(2).sum()
for x in filtered_trial_gradients).sqrt()
input_norm = sum(y.pow(2).sum()
for y in filtered_input_gradients).sqrt()
return 1 - (costs / trial_norm / input_norm)
def reset_attack(self: Self) -> None:
"""Reset attack to initial state."""
self.best_loss = float("inf")
self.best_reconstruction = None
self.best_reconstruction_round = None
self.model = deepcopy(self.original_model)
self.prepare_attack()
logger.info("Huang attack reset to initial state.")

def _configure_attack(self: Self, configs: dict) -> None:
pass
def get_configs(self: Self) -> dict:
"""Return configs used for attack."""
return self.configs
Loading