Skip to content

Commit

Permalink
Rename any node to partition
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-narozniak committed Mar 13, 2024
1 parent 7ac506a commit d374bc4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 49 deletions.
22 changes: 11 additions & 11 deletions datasets/flwr_datasets/partitioner/dirichlet_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class DirichletPartitioner(Partitioner):
even though the alpha stays the same).
The notion of balancing is explicitly introduced here (not mentioned in paper but
implemented in the code). It is a mechanism that excludes the node from
assigning new samples to it if the current number of samples on that node exceeds
the average number that the node would get in case of even data distribution.
It is controlled by`self_balancing` parameter.
implemented in the code). It is a mechanism that excludes the partition from
assigning new samples to it if the current number of samples on that partition
exceeds the average number that the partition would get in case of even data
distribution. It is controlled by`self_balancing` parameter.
Parameters
----------
Expand All @@ -61,7 +61,7 @@ class DirichletPartitioner(Partitioner):
paper's code although not mentioned in paper itself).
shuffle: bool
Whether to randomize the order of samples. Shuffling applied after the
samples assignment to nodes.
samples assignment to partitions.
seed: int
Seed used for dataset shuffling. It has no effect if `shuffle` is False.
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__( # pylint: disable=R0913

# Utility attributes
# The attributes below are determined during the first call to load_partition
self._avg_num_of_samples_per_node: Optional[float] = None
self._avg_num_of_samples_per_partition: Optional[float] = None
self._unique_classes: Optional[Union[List[int], List[str]]] = None
self._partition_id_to_indices: Dict[int, List[int]] = {}
self._partition_id_to_indices_determined = False
Expand Down Expand Up @@ -205,7 +205,7 @@ def _determine_partition_id_to_indices_if_needed(
self._unique_classes = self.dataset.unique(self._partition_by)
assert self._unique_classes is not None
# This is needed only if self._self_balancing is True (the default option)
self._avg_num_of_samples_per_node = self.dataset.num_rows / self._num_partitions
self._avg_num_of_samples_per_partition = self.dataset.num_rows / self._num_partitions

# Change targets list data type to numpy
targets = np.array(self.dataset[self._partition_by])
Expand All @@ -214,7 +214,7 @@ def _determine_partition_id_to_indices_if_needed(
# min_partition_size is reached.
sampling_try = 0
while True:
# Prepare data structure to store indices assigned to node ids
# Prepare data structure to store indices assigned to partition ids
partition_id_to_indices: Dict[int, List[int]] = {}
for nid in range(self._num_partitions):
partition_id_to_indices[nid] = []
Expand All @@ -232,16 +232,16 @@ def _determine_partition_id_to_indices_if_needed(
nid
]
# Balancing (not mentioned in the paper but implemented)
# Do not assign additional samples to the node if it already has more
# Do not assign additional samples to the partition if it already has more
# than the average numbers of samples per partition. Note that it might
# especially affect classes that are later in the order. This is the
# reason for more sparse division that the alpha might suggest.
if self._self_balancing:
assert self._avg_num_of_samples_per_node is not None
assert self._avg_num_of_samples_per_partition is not None
for nid in nid_to_proportion_of_k_samples.copy():
if (
len(partition_id_to_indices[nid])
> self._avg_num_of_samples_per_node
> self._avg_num_of_samples_per_partition
):
nid_to_proportion_of_k_samples[nid] = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class InnerDirichletPartitioner(Partitioner): # pylint: disable=R0902
number of unique classes)
shuffle: bool
Whether to randomize the order of samples. Shuffling applied after the
samples assignment to nodes.
samples assignment to partitions.
seed: int
Seed used for dataset shuffling. It has no effect if `shuffle` is False.
Expand Down Expand Up @@ -91,7 +91,6 @@ def __init__( # pylint: disable=R0913
self._num_unique_classes: Optional[int] = None
self._num_partitions = len(self._partition_sizes)

# self._avg_num_of_samples_per_node: Optional[float] = None
self._partition_id_to_indices: Dict[int, List[int]] = {}
self._partition_id_to_indices_determined = False

Expand Down Expand Up @@ -211,18 +210,18 @@ def _determine_partition_id_to_indices_if_needed(
for cid in range(self._num_partitions)
]

# Node id to number of sample left for allocation for that node id
# Node id to number of sample left for allocation for that partition id
partition_id_to_left_to_allocate = dict(
zip(range(self._num_partitions), self._partition_sizes)
)

not_full_partition_ids = list(range(self._num_partitions))
while np.sum(list(partition_id_to_left_to_allocate.values())) != 0:
# Choose a node
# Choose a partition
current_partition_id = self._rng.choice(not_full_partition_ids)
# If current node is full resample a client
# If current partition is full resample a client
if partition_id_to_left_to_allocate[current_partition_id] == 0:
# When the node is full, exclude it from the sampling nodes list
# When the partition is full, exclude it from the sampling partitions list
not_full_partition_ids.pop(
not_full_partition_ids.index(current_partition_id)
)
Expand Down
58 changes: 29 additions & 29 deletions datasets/flwr_datasets/partitioner/shard_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902
The algorithm works as follows: the dataset is sorted by label e.g. [samples with
label 1, samples with labels 2 ...], then the shards are created, with each
shard of size = `shard_size` if provided or automatically calculated:
shards_size = len(dataset) / `num_partitions` * `num_shards_per_node`.
shards_size = len(dataset) / `num_partitions` * `num_shards_per_partition`.
A shard is just a block (chunk) of a `dataset` that contains `shard_size`
consecutive samples. There might be shards that contain samples associated with more
Expand All @@ -42,17 +42,17 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902
has samples with more than one unique label is when the shard size is bigger than
the number of samples of a certain class.
Each partition is created from `num_shards_per_node` that are chosen randomly.
Each partition is created from `num_shards_per_partition` that are chosen randomly.
There are a few ways of partitioning data that result in certain properties
(depending on the parameters specification):
1) same number of shards per nodes + the same shard size (specify:
a) `num_shards_per_nodes`, `shard_size`; or b) `num_shards_per_node`)
1) same number of shards per partitions + the same shard size (specify:
a) `num_shards_per_partitions`, `shard_size`; or b) `num_shards_per_partition`)
In case of b the `shard_size` is calculated as floor(len(dataset) /
(`num_shards_per_nodes` * `num_partitions`))
2) possibly different number of shards per node (use nearly all data) + the same
(`num_shards_per_partitions` * `num_partitions`))
2) possibly different number of shards per partition (use nearly all data) + the same
shard size (specify: `shard_size` + `keep_incomplete_shard=False`)
3) possibly different number of shards per node (use all data) + possibly different
3) possibly different number of shards per partition (use all data) + possibly different
shard size (specify: `shard_size` + `keep_incomplete_shard=True`)
Expand All @@ -79,16 +79,16 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902
others). If it is dropped each shard is equal size. (It does not mean that each
client gets equal number of shards, which only happens if
`num_partitions` % `num_shards` = 0). This parameter has no effect if
`num_shards_per_nodes` and `shard_size` are specified.
`num_shards_per_partitions` and `shard_size` are specified.
shuffle: bool
Whether to randomize the order of samples. Shuffling applied after the
samples assignment to nodes.
samples assignment to partitions.
seed: int
Seed used for dataset shuffling. It has no effect if `shuffle` is False.
Examples
--------
1) If you need same number of shards per nodes + the same shard size (and you know
1) If you need same number of shards per partitions + the same shard size (and you know
both of these values)
>>> from flwr_datasets import FederatedDataset
>>> from flwr_datasets.partitioner import ShardPartitioner
Expand All @@ -107,7 +107,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902
[2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000]
2) If you want to use nearly all the data and do not need to have the number of
shard per each node to be the same
shard per each partition to be the same
>>> from flwr_datasets import FederatedDataset
>>> from flwr_datasets.partitioner import ShardPartitioner
>>>
Expand Down Expand Up @@ -150,7 +150,7 @@ def __init__( # pylint: disable=R0913
self._num_partitions = num_partitions
self._partition_by = partition_by
_check_if_natual_number(num_shards_per_partition, "num_shards_per_partition", True)
self._num_shards_per_node = num_shards_per_partition
self._num_shards_per_partition = num_shards_per_partition
self._num_shards_used: Optional[int] = None
_check_if_natual_number(shard_size, "shard_size", True)
self._shard_size = shard_size
Expand Down Expand Up @@ -197,7 +197,7 @@ def num_partitions(self) -> int:
def _determine_partition_id_to_indices_if_needed(
self,
) -> None: # pylint: disable=R0914
"""Assign sample indices to each node id.
"""Assign sample indices to each partition id.
This method works on sorted datasets. A "shard" is a part of the dataset of
consecutive samples (if self._keep_incomplete_shard is False, each shard is same
Expand All @@ -208,12 +208,12 @@ def _determine_partition_id_to_indices_if_needed(
return

# One of the specification allows to skip the `num_shards_per_partition` param
if self._num_shards_per_node is not None:
if self._num_shards_per_partition is not None:
self._num_shards_used = int(
self._num_partitions * self._num_shards_per_node
self._num_partitions * self._num_shards_per_partition
)
num_shards_per_node_array = (
np.ones(self._num_partitions) * self._num_shards_per_node
num_shards_per_partition_array = (
np.ones(self._num_partitions) * self._num_shards_per_partition
)
if self._shard_size is None:
self._compute_shard_size_if_missing()
Expand All @@ -230,7 +230,7 @@ def _determine_partition_id_to_indices_if_needed(
num_usable_shards_in_dataset = int(
math.floor(len(self.dataset) / self._shard_size)
)
elif self._num_shards_per_node is None:
elif self._num_shards_per_partition is None:
if self._shard_size is None:
raise ValueError(
"The shard_size needs to be specified if the "
Expand All @@ -257,22 +257,22 @@ def _determine_partition_id_to_indices_if_needed(
else:
raise ValueError(
"The keep_incomplete_shards need to be specified "
"when _num_shards_per_node is None."
"when _num_shards_per_partition is None."
)
num_shards_per_node = int(self._num_shards_used / self._num_partitions)
# Assign the shards per nodes (so far, the same as in ideal case)
num_shards_per_node_array = (
np.ones(self._num_partitions) * num_shards_per_node
num_shards_per_partition = int(self._num_shards_used / self._num_partitions)
# Assign the shards per partitions (so far, the same as in ideal case)
num_shards_per_partition_array = (
np.ones(self._num_partitions) * num_shards_per_partition
)
num_shards_assigned = self._num_partitions * num_shards_per_node
num_shards_assigned = self._num_partitions * num_shards_per_partition
num_shards_to_assign = self._num_shards_used - num_shards_assigned
# Assign the "missing" shards
for i in range(num_shards_to_assign):
num_shards_per_node_array[i] += 1
num_shards_per_partition_array[i] += 1

else:
raise ValueError(
"The specification of nm_shards_per_node and "
"The specification of nm_shards_per_partition and "
"keep_incomplete_shards is not correct."
)

Expand All @@ -284,7 +284,7 @@ def _determine_partition_id_to_indices_if_needed(
)

indices_on_which_to_split_shards = np.cumsum(
num_shards_per_node_array, dtype=int
num_shards_per_partition_array, dtype=int
)

shard_indices_array = self._rng.permutation(num_usable_shards_in_dataset)[
Expand Down Expand Up @@ -342,9 +342,9 @@ def _compute_shard_size_if_missing(self) -> None:
self._shard_size = int(num_rows / self._num_shards_used)

def _check_possibility_of_partitions_creation(self) -> None:
if self._shard_size is not None and self._num_shards_per_node is not None:
if self._shard_size is not None and self._num_shards_per_partition is not None:
implied_min_dataset_size = (
self._shard_size * self._num_shards_per_node * self._num_partitions
self._shard_size * self._num_shards_per_partition * self._num_partitions
)
if implied_min_dataset_size > len(self.dataset):
raise ValueError(
Expand Down
6 changes: 3 additions & 3 deletions datasets/flwr_datasets/partitioner/size_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class SizePartitioner(Partitioner):
`partition_id_to_size_fn(partition_id)` ~ number of samples for `partition_id`
If the function doesn't transform the `partition_id` it's a linear correlation
between the number of sample for the node and the value of `partition_id`. For
between the number of sample for the partition and the value of `partition_id`. For
instance, if the partition ids range from 1 to M, partition with id 1 gets 1 unit of
data, client 2 gets 2 units, and so on, up to node M which gets M units.
data, client 2 gets 2 units, and so on, up to partition M which gets M units.
Note that size corresponding to the `partition_id` is deterministic, yet in case of
different dataset shuffling the assignment of samples to `partition_id` will vary.
Expand All @@ -44,7 +44,7 @@ class SizePartitioner(Partitioner):
num_partitions : int
The total number of partitions that the data will be divided into.
partition_id_to_size_fn : Callable
Function that defines the relationship between node id and the number of
Function that defines the relationship between partition id and the number of
samples.
"""

Expand Down

0 comments on commit d374bc4

Please sign in to comment.