From 578832dedbec8a115552f57c3bfd05fa13e4f915 Mon Sep 17 00:00:00 2001 From: damjimenezgu Date: Mon, 17 Jun 2024 11:36:31 +0200 Subject: [PATCH] include functions non-iid-ness metrics in utils.py --- datasets/flwr_datasets/metrics/utils.py | 236 +++++++++++++++++++++++- 1 file changed, 235 insertions(+), 1 deletion(-) diff --git a/datasets/flwr_datasets/metrics/utils.py b/datasets/flwr_datasets/metrics/utils.py index 8f78b2fd4c32..5f7debe9b22a 100644 --- a/datasets/flwr_datasets/metrics/utils.py +++ b/datasets/flwr_datasets/metrics/utils.py @@ -15,9 +15,11 @@ """Utils for metrics computation.""" +import math import warnings -from typing import List, Optional, Union +from typing import Any, List, Optional, Tuple, Union +import numpy as np import pandas as pd from flwr_datasets.partitioner import Partitioner @@ -261,3 +263,235 @@ def _compute_frequencies( return frequencies frequencies = counts.divide(len(labels)) return frequencies + + +def compute_counts( + labels: Union[List[int], List[str]], unique_labels: Union[List[int], List[str]] +) -> pd.Series: + """Compute the count of labels when taking into account all possible labels. + + Also known as absolute frequency. + + Parameters + ---------- + labels: Union[List[int], List[str]] + The labels from the datasets. + unique_labels: Union[List[int], List[str]] + The reference all unique label. Needed to avoid missing any label, instead + having the value equal to zero for them. + + Returns + ------- + label_counts: pd.Series + The pd.Series with label as indices and counts as values. + """ + if len(unique_labels) != len(set(unique_labels)): + raise ValueError("unique_labels must contain unique elements only.") + labels_series = pd.Series(labels) + label_counts = labels_series.value_counts() + label_counts_with_zeros = pd.Series(index=unique_labels, data=0) + label_counts_with_zeros = label_counts_with_zeros.add( + label_counts, fill_value=0 + ).astype(int) + return label_counts_with_zeros + + +def compute_frequency( + labels: Union[List[int], List[str]], unique_labels: Union[List[int], List[str]] +) -> pd.Series: + """Compute the distribution of labels when taking into account all possible labels. + + Also known as relative frequency. + + Parameters + ---------- + labels: Union[List[int], List[str]] + The labels from the datasets. + unique_labels: Union[List[int], List[str]] + The reference all unique label. Needed to avoid missing any label, instead + having the value equal to zero for them. + + Returns + ------- + The pd.Series with label as indices and probabilities as values. + """ + counts = compute_counts(labels, unique_labels) + if len(labels) == 0: + counts = counts.astype(float) + return counts + counts = counts.divide(len(labels)) + return counts + + +def get_distros( + targets_per_client: List[List[Union[Any]]], num_bins: int = 0 +) -> List[List[float]]: + """Get the distributions (percentages) for multiple clients' targets. + + Parameters + ---------- + targets_per_client : list of lists, array-like + Targets (labels) for each client (local node). + num_bins : int + Number of bins used to bin the targets when the task is 'regression'. + + Returns + ------- + distributions: list of lists, array like + Distributions (percentages) of the clients' targets. + """ + # Flatten targets array + targets = np.concatenate(targets_per_client) + + # Bin target for regression tasks + if num_bins > 0: + targets_per_client, targets = bin_targets_per_client( + targets, targets_per_client, num_bins + ) + + # Get unique classes and counts + unique_classes, _ = np.unique(targets, return_counts=True) + + # Calculate distribution (percentage) for each client + distributions = [] + for client_targets in targets_per_client: + # Count occurrences of each unique class in client's targets + client_counts = np.bincount( + np.searchsorted(unique_classes, client_targets), + minlength=len(unique_classes), + ) + # Get percentages + client_percentage = client_counts / len(client_targets) + distributions.append(client_percentage.tolist()) + + return distributions + + +def bin_targets( + targets: Union[np.ndarray[Any, np.dtype[Any]], List[Any]], num_bins: int +) -> Tuple[np.ndarray[Any, np.dtype[np.float64]], np.ndarray[Any, np.dtype[np.int64]]]: + """Get the target binned. + + Parameters + ---------- + targets : lists + Targets (labels) variable. + + num_bins : int + Number of bins used to bin the targets when the task is 'regression'. + + Returns + ------- + bins: list + Bins calculated. + binned_targets: + Binned target variable. + """ + # Compute bins + bins = np.linspace(min(targets), max(targets), num_bins + 1) + # Bin the targets + binned_targets = np.digitize(targets, bins) + return bins, binned_targets + + +def bin_targets_per_client( + targets: Union[np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[np.int64]]], + targets_per_client: Union[List[List[Union[int, str, bool]]], List[List[int]]], + num_bins: int, +) -> Tuple[List[List[Any]], np.ndarray[Any, np.dtype[np.int64]]]: + """Get the target binned. + + Parameters + ---------- + targets : lists + Targets (labels) variable. + targets_per_client : lists of lists, array-like + Targets (labels) for each client (local node). + num_bins : int + Number of bins used to bin the targets when the task is 'regression'. + + Returns + ------- + binned_targets_per_client: list + Bins calculated target of each client. + binned_targets: + Binned target variable. + """ + # Bin targets + bins, binned_targets = bin_targets(targets, num_bins) + # Bin each clients' target using calculated bins + binned_targets_per_client = [] + for client_targets in targets_per_client: + binned_client_targets = list(np.digitize(np.array(client_targets), bins)) + binned_targets_per_client.append(binned_client_targets) + return binned_targets_per_client, binned_targets + + +def is_type(lst: List[List[Union[Any]]], data_type: Any) -> bool: + """Check if values of lists are of certain type. + + Parameters + ---------- + lst : list of lists, array-like + Targets (labels) for each client (local node). + data_type : Python data type + Desired data type to check + """ + if data_type == int: + return any(isinstance(item, int) for sublist in lst for item in sublist) + elif data_type == float: + return any(isinstance(item, float) for sublist in lst for item in sublist) + elif data_type == str: + return any(isinstance(item, str) for sublist in lst for item in sublist) + elif data_type == bool: + return any(isinstance(item, bool) for sublist in lst for item in sublist) + else: + raise ValueError( + "Unsupported data type. Please choose from int, float, str, or bool." + ) + + +def entropy( + distribution: Union[np.ndarray[Any, np.dtype[Any]], List[float]], + normalize: bool = True, +) -> Any: + """Calculate the entropy. + + Parameters + ---------- + distribution : list of lists, array-like + Distribution (percentages) of targets for each local node (client). + normalize : bool + Flag to normalize the entropy. + + Returns + ------- + entropy_val: float + Entropy. + """ + entropy_value = -sum(p * math.log2(p) for p in distribution if p != 0) + if normalize: + max_entropy = math.log2(np.array(distribution).shape[0]) + return entropy_value / max_entropy + return entropy_value + + +def normalize_value(value: float, min_value: float = 0, max_value: float = 1) -> float: + """Scale (Normalize) input value between min_val and max_val. + + Parameters + ---------- + value : float + Value to be normalized. + min_value : float + Minimum bound of normalization. + max_value : float + Maximum bound of normalization. + + Returns + ------- + value_normalized: float + Normalized value between min_val and max_val. + """ + value_normalized = (value - min_value) / (max_value - min_value) + return value_normalized