Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename node_id to partition_id in Flower Datasets #3129

Merged
merged 15 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datasets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ If you plan to change the type of the dataset to run the code with your ML frame

# Usage

Flower Datasets exposes the `FederatedDataset` abstraction to represent the dataset needed for federated learning/evaluation/analytics. It has two powerful methods that let you handle the dataset preprocessing: `load_partition(node_id, split)` and `load_full(split)`.
Flower Datasets exposes the `FederatedDataset` abstraction to represent the dataset needed for federated learning/evaluation/analytics. It has two powerful methods that let you handle the dataset preprocessing: `load_partition(partition_id, split)` and `load_full(split)`.

Here's a basic quickstart example of how to partition the MNIST dataset:

Expand Down
6 changes: 3 additions & 3 deletions datasets/flwr_datasets/federated_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(

def load_partition(
self,
node_id: int,
partition_id: int,
split: Optional[str] = None,
) -> Union[Dataset, List[Dataset], DatasetDict]:
"""Load the partition specified by the idx in the selected split.
Expand All @@ -148,7 +148,7 @@ def load_partition(

Parameters
----------
node_id : int
partition_id : int
Partition index for the selected split, idx in {0, ..., num_partitions - 1}.
split : Optional[str]
Name of the (partitioned) split (e.g. "train", "test"). You can skip this
Expand Down Expand Up @@ -179,7 +179,7 @@ def load_partition(
self._check_if_split_possible_to_federate(split)
partitioner: Partitioner = self._partitioners[split]
self._assign_dataset_to_partitioner(split)
partition = partitioner.load_partition(node_id)
partition = partitioner.load_partition(partition_id)
if self._partition_division is None:
return partition
partition_division = self._partition_division.get(split)
Expand Down
78 changes: 43 additions & 35 deletions datasets/flwr_datasets/partitioner/dirichlet_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from flwr_datasets.partitioner.partitioner import Partitioner


# pylint: disable=R0902, R0912
# pylint: disable=R0902, R0912, R0914
class DirichletPartitioner(Partitioner):
"""Partitioner based on Dirichlet distribution.

Expand All @@ -39,10 +39,10 @@ class DirichletPartitioner(Partitioner):
even though the alpha stays the same).

The notion of balancing is explicitly introduced here (not mentioned in paper but
implemented in the code). It is a mechanism that excludes the node from
assigning new samples to it if the current number of samples on that node exceeds
the average number that the node would get in case of even data distribution.
It is controlled by`self_balancing` parameter.
implemented in the code). It is a mechanism that excludes the partition from
assigning new samples to it if the current number of samples on that partition
exceeds the average number that the partition would get in case of even data
distribution. It is controlled by`self_balancing` parameter.

Parameters
----------
Expand All @@ -61,7 +61,7 @@ class DirichletPartitioner(Partitioner):
paper's code although not mentioned in paper itself).
shuffle: bool
Whether to randomize the order of samples. Shuffling applied after the
samples assignment to nodes.
samples assignment to partitions.
seed: int
Seed used for dataset shuffling. It has no effect if `shuffle` is False.

Expand All @@ -78,7 +78,9 @@ class DirichletPartitioner(Partitioner):
>>> print(partition[0]) # Print the first example
{'image': <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28 at 0x127B92170>,
'label': 4}
>>> partition_sizes = [len(fds.load_partition(node_id)) for node_id in range(10)]
>>> partition_sizes = partition_sizes = [
>>> len(fds.load_partition(partition_id)) for partition_id in range(10)
>>> ]
>>> print(sorted(partition_sizes))
[2134, 2615, 3646, 6011, 6170, 6386, 6715, 7653, 8435, 10235]
"""
Expand Down Expand Up @@ -107,17 +109,17 @@ def __init__( # pylint: disable=R0913

# Utility attributes
# The attributes below are determined during the first call to load_partition
self._avg_num_of_samples_per_node: Optional[float] = None
self._avg_num_of_samples_per_partition: Optional[float] = None
self._unique_classes: Optional[Union[List[int], List[str]]] = None
self._node_id_to_indices: Dict[int, List[int]] = {}
self._node_id_to_indices_determined = False
self._partition_id_to_indices: Dict[int, List[int]] = {}
self._partition_id_to_indices_determined = False

def load_partition(self, node_id: int) -> datasets.Dataset:
def load_partition(self, partition_id: int) -> datasets.Dataset:
"""Load a partition based on the partition index.

Parameters
----------
node_id : int
partition_id : int
the index that corresponds to the requested partition

Returns
Expand All @@ -129,14 +131,14 @@ def load_partition(self, node_id: int) -> datasets.Dataset:
# requested. Only the first call creates the indices assignments for all the
# partition indices.
self._check_num_partitions_correctness_if_needed()
self._determine_node_id_to_indices_if_needed()
return self.dataset.select(self._node_id_to_indices[node_id])
self._determine_partition_id_to_indices_if_needed()
return self.dataset.select(self._partition_id_to_indices[partition_id])

@property
def num_partitions(self) -> int:
"""Total number of partitions."""
self._check_num_partitions_correctness_if_needed()
self._determine_node_id_to_indices_if_needed()
self._determine_partition_id_to_indices_if_needed()
return self._num_partitions

def _initialize_alpha(
Expand Down Expand Up @@ -192,16 +194,20 @@ def _initialize_alpha(
)
return alpha

def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914
def _determine_partition_id_to_indices_if_needed(
self,
) -> None:
"""Create an assignment of indices to the partition indices."""
if self._node_id_to_indices_determined:
if self._partition_id_to_indices_determined:
return

# Generate information needed for Dirichlet partitioning
self._unique_classes = self.dataset.unique(self._partition_by)
assert self._unique_classes is not None
# This is needed only if self._self_balancing is True (the default option)
self._avg_num_of_samples_per_node = self.dataset.num_rows / self._num_partitions
self._avg_num_of_samples_per_partition = (
self.dataset.num_rows / self._num_partitions
)

# Change targets list data type to numpy
targets = np.array(self.dataset[self._partition_by])
Expand All @@ -210,10 +216,10 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0
# min_partition_size is reached.
sampling_try = 0
while True:
# Prepare data structure to store indices assigned to node ids
node_id_to_indices: Dict[int, List[int]] = {}
# Prepare data structure to store indices assigned to partition ids
partition_id_to_indices: Dict[int, List[int]] = {}
for nid in range(self._num_partitions):
node_id_to_indices[nid] = []
partition_id_to_indices[nid] = []

# Iterated over all unique labels (they are not necessarily of type int)
for k in self._unique_classes:
Expand All @@ -228,16 +234,16 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0
nid
]
# Balancing (not mentioned in the paper but implemented)
# Do not assign additional samples to the node if it already has more
# than the average numbers of samples per partition. Note that it might
# especially affect classes that are later in the order. This is the
# reason for more sparse division that the alpha might suggest.
# Do not assign additional samples to the partition if it already has
# more than the average numbers of samples per partition. Note that it
# might especially affect classes that are later in the order. This is
# the reason for more sparse division that the alpha might suggest.
if self._self_balancing:
assert self._avg_num_of_samples_per_node is not None
assert self._avg_num_of_samples_per_partition is not None
for nid in nid_to_proportion_of_k_samples.copy():
if (
len(node_id_to_indices[nid])
> self._avg_num_of_samples_per_node
len(partition_id_to_indices[nid])
> self._avg_num_of_samples_per_partition
):
nid_to_proportion_of_k_samples[nid] = 0

Expand All @@ -262,18 +268,20 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0
)

# Append new indices (coming from class k) to the existing indices
for nid, indices in node_id_to_indices.items():
for nid, indices in partition_id_to_indices.items():
indices.extend(split_indices[nid].tolist())

# Determine if the indices assignment meets the min_partition_size
# If it does not mean the requirement repeat the Dirichlet sampling process
# Otherwise break the while loop
min_sample_size_on_client = min(
len(indices) for indices in node_id_to_indices.values()
len(indices) for indices in partition_id_to_indices.values()
)
if min_sample_size_on_client >= self._min_partition_size:
break
sample_sizes = [len(indices) for indices in node_id_to_indices.values()]
sample_sizes = [
len(indices) for indices in partition_id_to_indices.values()
]
alpha_not_met = [
self._alpha[i]
for i, ss in enumerate(sample_sizes)
Expand Down Expand Up @@ -309,15 +317,15 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0
# Shuffle the indices not to have the datasets with targets in sequences like
# [00000, 11111, ...]) if the shuffle is True
if self._shuffle:
for indices in node_id_to_indices.values():
for indices in partition_id_to_indices.values():
# In place shuffling
self._rng.shuffle(indices)
self._node_id_to_indices = node_id_to_indices
self._node_id_to_indices_determined = True
self._partition_id_to_indices = partition_id_to_indices
self._partition_id_to_indices_determined = True

def _check_num_partitions_correctness_if_needed(self) -> None:
"""Test num_partitions when the dataset is given (in load_partition)."""
if not self._node_id_to_indices_determined:
if not self._partition_id_to_indices_determined:
if self._num_partitions > self.dataset.num_rows:
raise ValueError(
"The number of partitions needs to be smaller than the number of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def test_valid_initialization(
def test_min_partition_size_requirement(self) -> None:
"""Test if partitions are created with min partition size required."""
_, partitioner = _dummy_setup(3, 0.5, 100, "labels")
partition_list = [partitioner.load_partition(node_id) for node_id in [0, 1, 2]]
partition_list = [
partitioner.load_partition(partition_id) for partition_id in [0, 1, 2]
]
self.assertTrue(
all(len(p) > partitioner._min_partition_size for p in partition_list)
)
Expand All @@ -87,14 +89,14 @@ def test_alpha_in_ndarray_initialization(self) -> None:
_, partitioner = _dummy_setup(3, np.array([1.0, 1.0, 1.0]), 100, "labels")
self.assertTrue(np.all(partitioner._alpha == np.array([1.0, 1.0, 1.0])))

def test__determine_node_id_to_indices(self) -> None:
def test__determine_partition_id_to_indices(self) -> None:
"""Test the determine_nod_id_to_indices matches the flag after the call."""
num_partitions, alpha, num_rows, partition_by = 3, 0.5, 100, "labels"
_, partitioner = _dummy_setup(num_partitions, alpha, num_rows, partition_by)
partitioner._determine_node_id_to_indices_if_needed()
partitioner._determine_partition_id_to_indices_if_needed()
self.assertTrue(
partitioner._node_id_to_indices_determined
and len(partitioner._node_id_to_indices) == num_partitions
partitioner._partition_id_to_indices_determined
and len(partitioner._partition_id_to_indices) == num_partitions
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@


class ExponentialPartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are correlated with exp(node_id).
"""Partitioner creates partitions of size that are correlated with exp(id).

The amount of data each client gets is correlated with the exponent of partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets e units of
data, client 2 gets e^2 units, and so on, up to client M which gets e^M units.
The floor operation is applied on each of these numbers, it means floor(2.71...)
= 2; e^2 ~ 7.39 floor(7.39) = 7. The number is rounded down = the fraction is
always cut. The remainders of theses unassigned (fraction) samples is added to the
biggest partition (the one with the biggest node_id).
biggest partition (the one with the biggest partition_id).

Parameters
----------
Expand All @@ -38,6 +38,6 @@ class ExponentialPartitioner(SizePartitioner):
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.exp)
super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=np.exp)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
6 changes: 3 additions & 3 deletions datasets/flwr_datasets/partitioner/iid_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def __init__(self, num_partitions: int) -> None:
raise ValueError("The number of partitions must be greater than zero.")
self._num_partitions = num_partitions

def load_partition(self, node_id: int) -> datasets.Dataset:
def load_partition(self, partition_id: int) -> datasets.Dataset:
"""Load a single IID partition based on the partition index.

Parameters
----------
node_id : int
partition_id : int
the index that corresponds to the requested partition

Returns
Expand All @@ -48,7 +48,7 @@ def load_partition(self, node_id: int) -> datasets.Dataset:
single dataset partition
"""
return self.dataset.shard(
num_shards=self._num_partitions, index=node_id, contiguous=True
num_shards=self._num_partitions, index=partition_id, contiguous=True
)

@property
Expand Down
Loading
Loading