From e93c5a881b9f10d19c95d3488936acb37a44c58d Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Mon, 11 Mar 2024 10:00:07 +0100 Subject: [PATCH 1/9] Rename node_id to partition_id --- datasets/flwr_datasets/federated_dataset.py | 6 ++--- .../partitioner/dirichlet_partitioner.py | 4 +++- .../partitioner/exponential_partitioner.py | 4 ++-- .../partitioner/linear_partitioner.py | 2 +- .../partitioner/shard_partitioner.py | 16 +++++++++----- .../partitioner/size_partitioner.py | 22 +++++++++---------- .../partitioner/square_partitioner.py | 2 +- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/datasets/flwr_datasets/federated_dataset.py b/datasets/flwr_datasets/federated_dataset.py index c40f8cc34857..84649e87169f 100644 --- a/datasets/flwr_datasets/federated_dataset.py +++ b/datasets/flwr_datasets/federated_dataset.py @@ -102,7 +102,7 @@ def __init__( # Indicate if the dataset is prepared for `load_partition` or `load_full` self._dataset_prepared: bool = False - def load_partition(self, node_id: int, split: Optional[str] = None) -> Dataset: + def load_partition(self, partition_id: int, split: Optional[str] = None) -> Dataset: """Load the partition specified by the idx in the selected split. The dataset is downloaded only when the first call to `load_partition` or @@ -110,7 +110,7 @@ def load_partition(self, node_id: int, split: Optional[str] = None) -> Dataset: Parameters ---------- - node_id : int + partition_id : int Partition index for the selected split, idx in {0, ..., num_partitions - 1}. split : Optional[str] Name of the (partitioned) split (e.g. "train", "test"). You can skip this @@ -136,7 +136,7 @@ def load_partition(self, node_id: int, split: Optional[str] = None) -> Dataset: self._check_if_split_possible_to_federate(split) partitioner: Partitioner = self._partitioners[split] self._assign_dataset_to_partitioner(split) - return partitioner.load_partition(node_id) + return partitioner.load_partition(partition_id) def load_full(self, split: str) -> Dataset: """Load the full split of the dataset. diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index 5f1df71991bb..82647d454fd1 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -78,7 +78,9 @@ class DirichletPartitioner(Partitioner): >>> print(partition[0]) # Print the first example {'image': , 'label': 4} - >>> partition_sizes = [len(fds.load_partition(node_id)) for node_id in range(10)] + >>> partition_sizes = partition_sizes = [ + >>> len(fds.load_partition(partition_id)) for partition_id in range(10) + >>> ] >>> print(sorted(partition_sizes)) [2134, 2615, 3646, 6011, 6170, 6386, 6715, 7653, 8435, 10235] """ diff --git a/datasets/flwr_datasets/partitioner/exponential_partitioner.py b/datasets/flwr_datasets/partitioner/exponential_partitioner.py index 10b11eb3e126..2ccb383a8e84 100644 --- a/datasets/flwr_datasets/partitioner/exponential_partitioner.py +++ b/datasets/flwr_datasets/partitioner/exponential_partitioner.py @@ -21,7 +21,7 @@ class ExponentialPartitioner(SizePartitioner): - """Partitioner creates partitions of size that are correlated with exp(node_id). + """Partitioner creates partitions of size that are correlated with exp(id). The amount of data each client gets is correlated with the exponent of partition ID. For instance, if the IDs range from 1 to M, client with ID 1 gets e units of @@ -29,7 +29,7 @@ class ExponentialPartitioner(SizePartitioner): The floor operation is applied on each of these numbers, it means floor(2.71...) = 2; e^2 ~ 7.39 floor(7.39) = 7. The number is rounded down = the fraction is always cut. The remainders of theses unassigned (fraction) samples is added to the - biggest partition (the one with the biggest node_id). + biggest partition (the one with the biggest partition_id). Parameters ---------- diff --git a/datasets/flwr_datasets/partitioner/linear_partitioner.py b/datasets/flwr_datasets/partitioner/linear_partitioner.py index f77b0b87146d..edc195e661c7 100644 --- a/datasets/flwr_datasets/partitioner/linear_partitioner.py +++ b/datasets/flwr_datasets/partitioner/linear_partitioner.py @@ -19,7 +19,7 @@ class LinearPartitioner(SizePartitioner): - """Partitioner creates partitions of size that are linearly correlated with node_id. + """Partitioner creates partitions of size that are linearly correlated with id. The amount of data each client gets is linearly correlated with the partition ID. For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data, diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 7c86570fe487..f5cdbc5e9058 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -100,7 +100,9 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 >>> print(partition[0]) # Print the first example {'image': , 'label': 3} - >>> partition_sizes = [len(fds.load_partition(node_id)) for node_id in range(10)] + >>> partition_sizes = [ + >>> len(fds.load_partition(partition_id)) for partition_id in range(10) + >>> ] >>> print(partition_sizes) [2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000] @@ -112,7 +114,9 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 >>> partitioner = ShardPartitioner(num_partitions=9, partition_by="label", >>> shard_size=1_000) >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": partitioner}) - >>> partition_sizes = [len(fds.load_partition(node_id)) for node_id in range(9)] + >>> partition_sizes = [ + >>> len(fds.load_partition(partition_id)) for partition_id in range(9) + >>> ] >>> print(partition_sizes) [7000, 7000, 7000, 7000, 7000, 7000, 6000, 6000, 6000] @@ -123,7 +127,9 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 >>> partitioner = ShardPartitioner(num_partitions=10, partition_by="label", >>> shard_size=990, keep_incomplete_shard=True) >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": partitioner}) - >>> partition_sizes = [len(fds.load_partition(node_id)) for node_id in range(10)] + >>> partition_sizes = [ + >>> len(fds.load_partition(partition_id)) for partition_id in range(10) + >>> ] >>> print(sorted(partition_sizes)) [5550, 5940, 5940, 5940, 5940, 5940, 5940, 5940, 5940, 6930] """ @@ -273,14 +279,14 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 shard_indices_array = self._rng.permutation(num_usable_shards_in_dataset)[ : self._num_shards_used ] - # Randomly assign shards to node_id + # Randomly assign shards to partition_id nid_to_shard_indices = np.split( shard_indices_array, indices_on_which_to_split_shards )[:-1] node_id_to_indices: Dict[int, List[int]] = { cid: [] for cid in range(self._num_partitions) } - # Compute node_id to sample indices based on the shard indices + # Compute partition_id to sample indices based on the shard indices for node_id in range(self._num_partitions): for shard_idx in nid_to_shard_indices[node_id]: start_id = int(shard_idx * self._shard_size) diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py index 35ca750949ee..6eb1f1825028 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -24,20 +24,20 @@ class SizePartitioner(Partitioner): - """Base class for the deterministic size partitioning based on the `node_id`. + """Base class for the deterministic size partitioning based on the `partition_id`. - The client with `node_id` has the following relationship regarding the number of - samples. + The client with `partition_id` has the following relationship regarding the number + of samples. - `node_id_to_size_fn(node_id)` ~ number of samples for `node_id` + `node_id_to_size_fn(partition_id)` ~ number of samples for `partition_id` - If the function doesn't transform the `node_id` it's a linear correlation between - the number of sample for the node and the value of `node_id`. For instance, if the - node ids range from 1 to M, node with id 1 gets 1 unit of data, client 2 gets 2 - units, and so on, up to node M which gets M units. + If the function doesn't transform the `partition_id` it's a linear correlation + between the number of sample for the node and the value of `partition_id`. For + instance, if the node ids range from 1 to M, node with id 1 gets 1 unit of data, + client 2 gets 2 units, and so on, up to node M which gets M units. - Note that size corresponding to the `node_id` is deterministic, yet in case of - different dataset shuffling the assignment of samples to `node_id` will vary. + Note that size corresponding to the `partition_id` is deterministic, yet in case of + different dataset shuffling the assignment of samples to `partition_id` will vary. Parameters ---------- @@ -67,7 +67,7 @@ def __init__( def load_partition(self, node_id: int) -> datasets.Dataset: """Load a single partition based on the partition index. - The number of samples is dependent on the partition node_id. + The number of samples is dependent on the partition partition_id. Parameters ---------- diff --git a/datasets/flwr_datasets/partitioner/square_partitioner.py b/datasets/flwr_datasets/partitioner/square_partitioner.py index 109b8397870b..c00e7cfb32e0 100644 --- a/datasets/flwr_datasets/partitioner/square_partitioner.py +++ b/datasets/flwr_datasets/partitioner/square_partitioner.py @@ -21,7 +21,7 @@ class SquarePartitioner(SizePartitioner): - """Partitioner creates partitions of size that are correlated with squared node_id. + """Partitioner creates partitions of size that are correlated with squared id. The amount of data each client gets is correlated with the squared partition ID. For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data, From 28c374ac3a4c596e8a6caea0271a14b290cef4e4 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 17:36:30 +0000 Subject: [PATCH 2/9] Rename node_id to partition_id --- .../partitioner/dirichlet_partitioner.py | 6 +++--- .../partitioner/iid_partitioner.py | 6 +++--- .../partitioner/linear_partitioner.py | 2 +- .../partitioner/natural_id_partitioner.py | 20 +++++++++---------- .../flwr_datasets/partitioner/partitioner.py | 2 +- .../partitioner/size_partitioner_test.py | 2 +- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index 076559989acb..286bdc7d7401 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -114,12 +114,12 @@ def __init__( # pylint: disable=R0913 self._node_id_to_indices: Dict[int, List[int]] = {} self._node_id_to_indices_determined = False - def load_partition(self, node_id: int) -> datasets.Dataset: + def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -132,7 +132,7 @@ def load_partition(self, node_id: int) -> datasets.Dataset: # partition indices. self._check_num_partitions_correctness_if_needed() self._determine_node_id_to_indices_if_needed() - return self.dataset.select(self._node_id_to_indices[node_id]) + return self.dataset.select(self._node_id_to_indices[partition_id]) @property def num_partitions(self) -> int: diff --git a/datasets/flwr_datasets/partitioner/iid_partitioner.py b/datasets/flwr_datasets/partitioner/iid_partitioner.py index faa1dfa10615..ceddd386c7d3 100644 --- a/datasets/flwr_datasets/partitioner/iid_partitioner.py +++ b/datasets/flwr_datasets/partitioner/iid_partitioner.py @@ -34,12 +34,12 @@ def __init__(self, num_partitions: int) -> None: raise ValueError("The number of partitions must be greater than zero.") self._num_partitions = num_partitions - def load_partition(self, node_id: int) -> datasets.Dataset: + def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a single IID partition based on the partition index. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -48,7 +48,7 @@ def load_partition(self, node_id: int) -> datasets.Dataset: single dataset partition """ return self.dataset.shard( - num_shards=self._num_partitions, index=node_id, contiguous=True + num_shards=self._num_partitions, index=partition_id, contiguous=True ) @property diff --git a/datasets/flwr_datasets/partitioner/linear_partitioner.py b/datasets/flwr_datasets/partitioner/linear_partitioner.py index edc195e661c7..5871440dad5e 100644 --- a/datasets/flwr_datasets/partitioner/linear_partitioner.py +++ b/datasets/flwr_datasets/partitioner/linear_partitioner.py @@ -32,6 +32,6 @@ class LinearPartitioner(SizePartitioner): """ def __init__(self, num_partitions: int) -> None: - super().__init__(num_partitions=num_partitions, node_id_to_size_fn=lambda x: x) + super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=lambda x: x) if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") diff --git a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py index 947501965cc6..8e3502948dc6 100644 --- a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py +++ b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py @@ -29,28 +29,28 @@ def __init__( partition_by: str, ): super().__init__() - self._node_id_to_natural_id: Dict[int, str] = {} + self._partition_id_to_natural_id: Dict[int, str] = {} self._partition_by = partition_by - def _create_int_node_id_to_natural_id(self) -> None: + def _create_int_partition_id_to_natural_id(self) -> None: """Create a mapping from int indices to unique client ids from dataset. Natural ids come from the column specified in `partition_by`. """ unique_natural_ids = self.dataset.unique(self._partition_by) - self._node_id_to_natural_id = dict( + self._partition_id_to_natural_id = dict( zip(range(len(unique_natural_ids)), unique_natural_ids) ) - def load_partition(self, node_id: int) -> datasets.Dataset: - """Load a single partition corresponding to a single `node_id`. + def load_partition(self, partition_id: int) -> datasets.Dataset: + """Load a single partition corresponding to a single `partition_id`. The choice of the partition is based on unique integers assigned to each natural id present in the dataset in the `partition_by` column. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -58,17 +58,17 @@ def load_partition(self, node_id: int) -> datasets.Dataset: dataset_partition : Dataset single dataset partition """ - if len(self._node_id_to_natural_id) == 0: - self._create_int_node_id_to_natural_id() + if len(self._partition_id_to_natural_id) == 0: + self._create_int_partition_id_to_natural_id() return self.dataset.filter( - lambda row: row[self._partition_by] == self._node_id_to_natural_id[node_id] + lambda row: row[self._partition_by] == self._partition_id_to_natural_id[partition_id] ) @property def num_partitions(self) -> int: """Total number of partitions.""" - if len(self._node_id_to_natural_id) == 0: + if len(self._partition_id_to_natural_id) == 0: self._create_int_node_id_to_natural_id() return len(self._node_id_to_natural_id) diff --git a/datasets/flwr_datasets/partitioner/partitioner.py b/datasets/flwr_datasets/partitioner/partitioner.py index 73eb6f4a17b3..6b623ede2ba8 100644 --- a/datasets/flwr_datasets/partitioner/partitioner.py +++ b/datasets/flwr_datasets/partitioner/partitioner.py @@ -53,7 +53,7 @@ def dataset(self, value: Dataset) -> None: self._dataset = value @abstractmethod - def load_partition(self, node_id: int) -> Dataset: + def load_partition(self, partition_id: int) -> Dataset: """Load a single partition based on the partition index. Parameters diff --git a/datasets/flwr_datasets/partitioner/size_partitioner_test.py b/datasets/flwr_datasets/partitioner/size_partitioner_test.py index 390f6a613fce..71126248b1f8 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner_test.py @@ -49,7 +49,7 @@ def test_linear_distribution(self, num_partitions: int, num_rows: int) -> None: partitioner.dataset = dataset # Run a single partition loading to trigger the division _ = partitioner.load_partition(0) - total_samples = sum(partitioner.node_id_to_size.values()) + total_samples = sum(partitioner.partition_id_to_size.values()) self.assertEqual(total_samples, num_rows) # Testing if each partition is getting more than the previous one From 3420b01b01e66dd91ce79bcef51e3704d10e7457 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 17:39:13 +0000 Subject: [PATCH 3/9] Rename node_id to partition_id --- .../partitioner/dirichlet_partitioner.py | 30 ++++++++-------- .../partitioner/dirichlet_partitioner_test.py | 10 +++--- .../partitioner/exponential_partitioner.py | 2 +- .../inner_dirichlet_partitioner.py | 26 +++++++------- .../inner_dirichlet_partitioner_test.py | 4 +-- .../partitioner/natural_id_partitioner.py | 14 ++++---- .../natural_id_partitioner_test.py | 14 ++++---- .../flwr_datasets/partitioner/partitioner.py | 2 +- .../partitioner/shard_partitioner.py | 32 ++++++++--------- .../partitioner/shard_partitioner_test.py | 8 ++--- .../partitioner/size_partitioner.py | 36 +++++++++---------- .../partitioner/size_partitioner_test.py | 4 +-- .../partitioner/square_partitioner.py | 2 +- 13 files changed, 92 insertions(+), 92 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index 286bdc7d7401..b12d8770932f 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -111,8 +111,8 @@ def __init__( # pylint: disable=R0913 # The attributes below are determined during the first call to load_partition self._avg_num_of_samples_per_node: Optional[float] = None self._unique_classes: Optional[Union[List[int], List[str]]] = None - self._node_id_to_indices: Dict[int, List[int]] = {} - self._node_id_to_indices_determined = False + self._partition_id_to_indices: Dict[int, List[int]] = {} + self._partition_id_to_indices_determined = False def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. @@ -131,14 +131,14 @@ def load_partition(self, partition_id: int) -> datasets.Dataset: # requested. Only the first call creates the indices assignments for all the # partition indices. self._check_num_partitions_correctness_if_needed() - self._determine_node_id_to_indices_if_needed() - return self.dataset.select(self._node_id_to_indices[partition_id]) + self._determine_partition_id_to_indices_if_needed() + return self.dataset.select(self._partition_id_to_indices[partition_id]) @property def num_partitions(self) -> int: """Total number of partitions.""" self._check_num_partitions_correctness_if_needed() - self._determine_node_id_to_indices_if_needed() + self._determine_partition_id_to_indices_if_needed() return self._num_partitions def _initialize_alpha( @@ -194,9 +194,9 @@ def _initialize_alpha( ) return alpha - def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 """Create an assignment of indices to the partition indices.""" - if self._node_id_to_indices_determined: + if self._partition_id_to_indices_determined: return # Generate information needed for Dirichlet partitioning @@ -213,9 +213,9 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 sampling_try = 0 while True: # Prepare data structure to store indices assigned to node ids - node_id_to_indices: Dict[int, List[int]] = {} + partition_id_to_indices: Dict[int, List[int]] = {} for nid in range(self._num_partitions): - node_id_to_indices[nid] = [] + partition_id_to_indices[nid] = [] # Iterated over all unique labels (they are not necessarily of type int) for k in self._unique_classes: @@ -238,7 +238,7 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 assert self._avg_num_of_samples_per_node is not None for nid in nid_to_proportion_of_k_samples.copy(): if ( - len(node_id_to_indices[nid]) + len(partition_id_to_indices[nid]) > self._avg_num_of_samples_per_node ): nid_to_proportion_of_k_samples[nid] = 0 @@ -264,18 +264,18 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 ) # Append new indices (coming from class k) to the existing indices - for nid, indices in node_id_to_indices.items(): + for nid, indices in partition_id_to_indices.items(): indices.extend(split_indices[nid].tolist()) # Determine if the indices assignment meets the min_partition_size # If it does not mean the requirement repeat the Dirichlet sampling process # Otherwise break the while loop min_sample_size_on_client = min( - len(indices) for indices in node_id_to_indices.values() + len(indices) for indices in partition_id_to_indices.values() ) if min_sample_size_on_client >= self._min_partition_size: break - sample_sizes = [len(indices) for indices in node_id_to_indices.values()] + sample_sizes = [len(indices) for indices in partition_id_to_indices.values()] alpha_not_met = [ self._alpha[i] for i, ss in enumerate(sample_sizes) @@ -311,10 +311,10 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 # Shuffle the indices not to have the datasets with targets in sequences like # [00000, 11111, ...]) if the shuffle is True if self._shuffle: - for indices in node_id_to_indices.values(): + for indices in partition_id_to_indices.values(): # In place shuffling self._rng.shuffle(indices) - self._node_id_to_indices = node_id_to_indices + self._partition_id_to_indices = partition_id_to_indices self._node_id_to_indices_determined = True def _check_num_partitions_correctness_if_needed(self) -> None: diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py index c123f84effb7..4e9ace1532b8 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py @@ -77,7 +77,7 @@ def test_valid_initialization( def test_min_partition_size_requirement(self) -> None: """Test if partitions are created with min partition size required.""" _, partitioner = _dummy_setup(3, 0.5, 100, "labels") - partition_list = [partitioner.load_partition(node_id) for node_id in [0, 1, 2]] + partition_list = [partitioner.load_partition(partition_id) for partition_id in [0, 1, 2]] self.assertTrue( all(len(p) > partitioner._min_partition_size for p in partition_list) ) @@ -87,14 +87,14 @@ def test_alpha_in_ndarray_initialization(self) -> None: _, partitioner = _dummy_setup(3, np.array([1.0, 1.0, 1.0]), 100, "labels") self.assertTrue(np.all(partitioner._alpha == np.array([1.0, 1.0, 1.0]))) - def test__determine_node_id_to_indices(self) -> None: + def test__determine_partition_id_to_indices(self) -> None: """Test the determine_nod_id_to_indices matches the flag after the call.""" num_partitions, alpha, num_rows, partition_by = 3, 0.5, 100, "labels" _, partitioner = _dummy_setup(num_partitions, alpha, num_rows, partition_by) - partitioner._determine_node_id_to_indices_if_needed() + partitioner._determine_partition_id_to_indices_if_needed() self.assertTrue( - partitioner._node_id_to_indices_determined - and len(partitioner._node_id_to_indices) == num_partitions + partitioner._partition_id_to_indices_determined + and len(partitioner._partition_id_to_indices) == num_partitions ) diff --git a/datasets/flwr_datasets/partitioner/exponential_partitioner.py b/datasets/flwr_datasets/partitioner/exponential_partitioner.py index 2ccb383a8e84..d35944f29f6f 100644 --- a/datasets/flwr_datasets/partitioner/exponential_partitioner.py +++ b/datasets/flwr_datasets/partitioner/exponential_partitioner.py @@ -38,6 +38,6 @@ class ExponentialPartitioner(SizePartitioner): """ def __init__(self, num_partitions: int) -> None: - super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.exp) + super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=np.exp) if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py index bf07ab3591f5..6c53e4ecece2 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py @@ -92,15 +92,15 @@ def __init__( # pylint: disable=R0913 self._num_partitions = len(self._partition_sizes) # self._avg_num_of_samples_per_node: Optional[float] = None - self._node_id_to_indices: Dict[int, List[int]] = {} - self._node_id_to_indices_determined = False + self._partition_id_to_indices: Dict[int, List[int]] = {} + self._partition_id_to_indices_determined = False - def load_partition(self, node_id: int) -> datasets.Dataset: + def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -116,8 +116,8 @@ def load_partition(self, node_id: int) -> datasets.Dataset: self._check_the_sum_of_partition_sizes() self._determine_num_unique_classes_if_needed() self._alpha = self._initialize_alpha_if_needed(self._initial_alpha) - self._determine_node_id_to_indices_if_needed() - return self.dataset.select(self._node_id_to_indices[node_id]) + self._determine_partition_id_to_indices_if_needed() + return self.dataset.select(self._partition_id_to_indices[partition_id]) @property def num_partitions(self) -> int: @@ -127,7 +127,7 @@ def num_partitions(self) -> int: self._check_the_sum_of_partition_sizes() self._determine_num_unique_classes_if_needed() self._alpha = self._initialize_alpha_if_needed(self._initial_alpha) - self._determine_node_id_to_indices_if_needed() + self._determine_partition_id_to_indices_if_needed() return self._num_partitions def _initialize_alpha_if_needed( @@ -190,9 +190,9 @@ def _initialize_alpha_if_needed( ) return alpha - def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 """Create an assignment of indices to the partition indices.""" - if self._node_id_to_indices_determined: + if self._partition_id_to_indices_determined: return # Create class priors for the whole partitioning process @@ -210,14 +210,14 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 ] # Node id to number of sample left for allocation for that node id - node_id_to_left_to_allocate = dict( + partition_id_to_left_to_allocate = dict( zip(range(self._num_partitions), self._partition_sizes) ) - not_full_node_ids = list(range(self._num_partitions)) - while np.sum(list(node_id_to_left_to_allocate.values())) != 0: + not_full_partition_ids = list(range(self._num_partitions)) + while np.sum(list(partition_id_to_left_to_allocate.values())) != 0: # Choose a node - current_node_id = self._rng.choice(not_full_node_ids) + current_partition_id = self._rng.choice(not_full_partition_ids) # If current node is full resample a client if node_id_to_left_to_allocate[current_node_id] == 0: # When the node is full, exclude it from the sampling nodes list diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner_test.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner_test.py index 0c5fb502870e..86dc8a5df532 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner_test.py @@ -58,7 +58,7 @@ def test_correct_num_of_partitions(self) -> None: _, partitioner = _dummy_setup(num_rows, partition_by, partition_sizes, alpha) _ = partitioner.load_partition(0) self.assertEqual( - len(partitioner._node_id_to_indices.keys()), len(partition_sizes) + len(partitioner._partition_id_to_indices.keys()), len(partition_sizes) ) def test_correct_partition_sizes(self) -> None: @@ -71,7 +71,7 @@ def test_correct_partition_sizes(self) -> None: _, partitioner = _dummy_setup(num_rows, partition_by, partition_sizes, alpha) _ = partitioner.load_partition(0) sizes_created = [ - len(indices) for indices in partitioner._node_id_to_indices.values() + len(indices) for indices in partitioner._partition_id_to_indices.values() ] self.assertEqual(sorted(sizes_created), partition_sizes) diff --git a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py index 8e3502948dc6..874a251585d0 100644 --- a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py +++ b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py @@ -69,20 +69,20 @@ def load_partition(self, partition_id: int) -> datasets.Dataset: def num_partitions(self) -> int: """Total number of partitions.""" if len(self._partition_id_to_natural_id) == 0: - self._create_int_node_id_to_natural_id() - return len(self._node_id_to_natural_id) + self._create_int_partition_id_to_natural_id() + return len(self._partition_id_to_natural_id) @property - def node_id_to_natural_id(self) -> Dict[int, str]: + def partition_id_to_natural_id(self) -> Dict[int, str]: """Node id to corresponding natural id present. Natural ids are the unique values in `partition_by` column in dataset. """ - return self._node_id_to_natural_id + return self._partition_id_to_natural_id # pylint: disable=R0201 - @node_id_to_natural_id.setter - def node_id_to_natural_id(self, value: Dict[int, str]) -> None: + @partition_id_to_natural_id.setter + def partition_id_to_natural_id(self, value: Dict[int, str]) -> None: raise AttributeError( - "Setting the node_id_to_natural_id dictionary is not allowed." + "Setting the partition_id_to_natural_id dictionary is not allowed." ) diff --git a/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py b/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py index fb296294aec3..1677b9f00be9 100644 --- a/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py @@ -65,9 +65,9 @@ def test_load_partition_num_partitions( Only the correct data is tested in this method. """ _, partitioner = _dummy_setup(num_rows, num_unique_natural_id) - # Simulate usage to start lazy node_id_to_natural_id creation + # Simulate usage to start lazy partition_id_to_natural_id creation _ = partitioner.load_partition(0) - self.assertEqual(len(partitioner.node_id_to_natural_id), num_unique_natural_id) + self.assertEqual(len(partitioner.partition_id_to_natural_id), num_unique_natural_id) @parameterized.expand( # type: ignore # num_rows, num_unique_natural_ids @@ -105,14 +105,14 @@ def test_correct_number_of_partitions( ) -> None: """Test if the # of available partitions is equal to # of unique clients.""" _, partitioner = _dummy_setup(num_rows, num_unique_natural_ids) - _ = partitioner.load_partition(node_id=0) - self.assertEqual(len(partitioner.node_id_to_natural_id), num_unique_natural_ids) + _ = partitioner.load_partition(partition_id=0) + self.assertEqual(len(partitioner.partition_id_to_natural_id), num_unique_natural_ids) - def test_cannot_set_node_id_to_natural_id(self) -> None: - """Test the lack of ability to set node_id_to_natural_id.""" + def test_cannot_set_partition_id_to_natural_id(self) -> None: + """Test the lack of ability to set partition_id_to_natural_id.""" _, partitioner = _dummy_setup(num_rows=10, n_unique_natural_ids=2) with self.assertRaises(AttributeError): - partitioner.node_id_to_natural_id = {0: "0"} + partitioner.partition_id_to_natural_id = {0: "0"} if __name__ == "__main__": diff --git a/datasets/flwr_datasets/partitioner/partitioner.py b/datasets/flwr_datasets/partitioner/partitioner.py index 6b623ede2ba8..10ade52640e8 100644 --- a/datasets/flwr_datasets/partitioner/partitioner.py +++ b/datasets/flwr_datasets/partitioner/partitioner.py @@ -58,7 +58,7 @@ def load_partition(self, partition_id: int) -> Dataset: Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 7a6f6b23e265..68a6084ff38b 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -160,15 +160,15 @@ def __init__( # pylint: disable=R0913 # Utility attributes self._rng = np.random.default_rng(seed=self._seed) # NumPy random generator - self._node_id_to_indices: Dict[int, List[int]] = {} - self._node_id_to_indices_determined = False + self._partition_id_to_indices: Dict[int, List[int]] = {} + self._partition_id_to_indices_determined = False - def load_partition(self, node_id: int) -> datasets.Dataset: + def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -182,8 +182,8 @@ def load_partition(self, node_id: int) -> datasets.Dataset: self._check_num_partitions_correctness_if_needed() self._check_possibility_of_partitions_creation() self._sort_dataset_if_needed() - self._determine_node_id_to_indices_if_needed() - return self.dataset.select(self._node_id_to_indices[node_id]) + self._determine_partition_id_to_indices_if_needed() + return self.dataset.select(self._partition_id_to_indices[partition_id]) @property def num_partitions(self) -> int: @@ -191,18 +191,18 @@ def num_partitions(self) -> int: self._check_num_partitions_correctness_if_needed() self._check_possibility_of_partitions_creation() self._sort_dataset_if_needed() - self._determine_node_id_to_indices_if_needed() + self._determine_partition_id_to_indices_if_needed() return self._num_partitions - def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 """Assign sample indices to each node id. This method works on sorted datasets. A "shard" is a part of the dataset of consecutive samples (if self._keep_incomplete_shard is False, each shard is same size). """ - # No need to do anything if that node_id_to_indices are already determined - if self._node_id_to_indices_determined: + # No need to do anything if that partition_id_to_indices are already determined + if self._partition_id_to_indices_determined: return # One of the specification allows to skip the `num_shards_per_node` param @@ -292,20 +292,20 @@ def _determine_node_id_to_indices_if_needed(self) -> None: # pylint: disable=R0 nid_to_shard_indices = np.split( shard_indices_array, indices_on_which_to_split_shards )[:-1] - node_id_to_indices: Dict[int, List[int]] = { + partition_id_to_indices: Dict[int, List[int]] = { cid: [] for cid in range(self._num_partitions) } # Compute partition_id to sample indices based on the shard indices - for node_id in range(self._num_partitions): - for shard_idx in nid_to_shard_indices[node_id]: + for partition_id in range(self._num_partitions): + for shard_idx in nid_to_shard_indices[partition_id]: start_id = int(shard_idx * self._shard_size) end_id = min(int((shard_idx + 1) * self._shard_size), len(self.dataset)) - node_id_to_indices[node_id].extend(list(range(start_id, end_id))) + partition_id_to_indices[partition_id].extend(list(range(start_id, end_id))) if self._shuffle: - for indices in node_id_to_indices.values(): + for indices in partition_id_to_indices.values(): # In place shuffling self._rng.shuffle(indices) - self._node_id_to_indices = node_id_to_indices + self._partition_id_to_indices = node_id_to_indices self._node_id_to_indices_determined = True def _check_num_partitions_correctness_if_needed(self) -> None: diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner_test.py b/datasets/flwr_datasets/partitioner/shard_partitioner_test.py index 47968699bba7..62b21fe640c3 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner_test.py @@ -71,7 +71,7 @@ def test_correct_num_partitions(self) -> None: keep_incomplete_shard, ) _ = partitioner.load_partition(0) - num_partitions_created = len(partitioner._node_id_to_indices.keys()) + num_partitions_created = len(partitioner._partition_id_to_indices.keys()) self.assertEqual(num_partitions_created, num_partitions) def test_correct_partition_sizes(self) -> None: @@ -145,7 +145,7 @@ def test_correct_num_partitions(self) -> None: keep_incomplete_shard, ) _ = partitioner.load_partition(0) - num_partitions_created = len(partitioner._node_id_to_indices.keys()) + num_partitions_created = len(partitioner._partition_id_to_indices.keys()) self.assertEqual(num_partitions_created, num_partitions) def test_correct_partition_sizes(self) -> None: @@ -219,7 +219,7 @@ def test_correct_num_partitions(self) -> None: keep_incomplete_shard, ) _ = partitioner.load_partition(0) - num_partitions_created = len(partitioner._node_id_to_indices.keys()) + num_partitions_created = len(partitioner._partition_id_to_indices.keys()) self.assertEqual(num_partitions_created, num_partitions) def test_correct_partition_sizes(self) -> None: @@ -292,7 +292,7 @@ def test_correct_num_partitions(self) -> None: keep_incomplete_shard, ) _ = partitioner.load_partition(0) - num_partitions_created = len(partitioner._node_id_to_indices.keys()) + num_partitions_created = len(partitioner._partition_id_to_indices.keys()) self.assertEqual(num_partitions_created, num_partitions) def test_correct_partition_sizes(self) -> None: diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py index b2a629e33924..e4c7b9bc6691 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -29,7 +29,7 @@ class SizePartitioner(Partitioner): The client with `partition_id` has the following relationship regarding the number of samples. - `node_id_to_size_fn(partition_id)` ~ number of samples for `partition_id` + `partition_id_to_size_fn(partition_id)` ~ number of samples for `partition_id` If the function doesn't transform the `partition_id` it's a linear correlation between the number of sample for the node and the value of `partition_id`. For @@ -43,7 +43,7 @@ class SizePartitioner(Partitioner): ---------- num_partitions : int The total number of partitions that the data will be divided into. - node_id_to_size_fn : Callable + partition_id_to_size_fn : Callable Function that defines the relationship between node id and the number of samples. """ @@ -51,27 +51,27 @@ class SizePartitioner(Partitioner): def __init__( self, num_partitions: int, - node_id_to_size_fn: Callable, # type: ignore[type-arg] + partition_id_to_size_fn: Callable, # type: ignore[type-arg] ) -> None: super().__init__() if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") self._num_partitions = num_partitions - self._node_id_to_size_fn = node_id_to_size_fn + self._partition_id_to_size_fn = partition_id_to_size_fn - self._node_id_to_size: Dict[int, int] = {} - self._node_id_to_indices: Dict[int, List[int]] = {} + self._partition_id_to_size: Dict[int, int] = {} + self._partition_id_to_indices: Dict[int, List[int]] = {} # A flag to perform only a single compute to determine the indices - self._node_id_to_indices_determined = False + self._partition_id_to_indices_determined = False - def load_partition(self, node_id: int) -> datasets.Dataset: + def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a single partition based on the partition index. The number of samples is dependent on the partition partition_id. Parameters ---------- - node_id : int + partition_id : int the index that corresponds to the requested partition Returns @@ -81,28 +81,28 @@ def load_partition(self, node_id: int) -> datasets.Dataset: """ # The partitioning is done lazily - only when the first partition is requested. # A single run creates the indices assignments for all the partition indices. - self._determine_node_id_to_indices_if_needed() - return self.dataset.select(self._node_id_to_indices[node_id]) + self._determine_partition_id_to_indices_if_needed() + return self.dataset.select(self._partition_id_to_indices[partition_id]) @property def num_partitions(self) -> int: """Total number of partitions.""" - self._determine_node_id_to_indices_if_needed() + self._determine_partition_id_to_indices_if_needed() return self._num_partitions @property - def node_id_to_size(self) -> Dict[int, int]: + def partition_id_to_size(self) -> Dict[int, int]: """Node id to the number of samples.""" - return self._node_id_to_size + return self._partition_id_to_size @property - def node_id_to_indices(self) -> Dict[int, List[int]]: + def partition_id_to_indices(self) -> Dict[int, List[int]]: """Node id to the list of indices.""" - return self._node_id_to_indices + return self._partition_id_to_indices - def _determine_node_id_to_size(self) -> None: + def _determine_partition_id_to_size(self) -> None: """Determine data quantity associated with partition indices.""" - data_division_in_units = self._node_id_to_size_fn( + data_division_in_units = self._partition_id_to_size_fn( np.linspace(start=1, stop=self._num_partitions, num=self._num_partitions) ) total_units: Union[int, float] = data_division_in_units.sum() diff --git a/datasets/flwr_datasets/partitioner/size_partitioner_test.py b/datasets/flwr_datasets/partitioner/size_partitioner_test.py index 71126248b1f8..086ca3731e58 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner_test.py @@ -55,7 +55,7 @@ def test_linear_distribution(self, num_partitions: int, num_rows: int) -> None: # Testing if each partition is getting more than the previous one last_count = 0 for i in range(num_partitions): - current_count = partitioner.node_id_to_size[i] + current_count = partitioner.partition_id_to_size[i] self.assertGreaterEqual(current_count, last_count) last_count = current_count @@ -77,7 +77,7 @@ def test_undivided_samples(self, num_partitions: int, num_rows: int) -> None: actual_samples_in_last_partition = len( partitioner.load_partition(last_partition_id) ) - expected_samples_in_last_partition = partitioner.node_id_to_size[ + expected_samples_in_last_partition = partitioner.partition_id_to_size[ last_partition_id ] self.assertEqual( diff --git a/datasets/flwr_datasets/partitioner/square_partitioner.py b/datasets/flwr_datasets/partitioner/square_partitioner.py index c00e7cfb32e0..7ff2838d18e6 100644 --- a/datasets/flwr_datasets/partitioner/square_partitioner.py +++ b/datasets/flwr_datasets/partitioner/square_partitioner.py @@ -34,6 +34,6 @@ class SquarePartitioner(SizePartitioner): """ def __init__(self, num_partitions: int) -> None: - super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.square) + super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=np.square) if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") From 0ee35853137531c08985b24f5b4da34151c8e3a6 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 17:50:24 +0000 Subject: [PATCH 4/9] Rename node_id to partition_id --- .../partitioner/dirichlet_partitioner.py | 4 +-- .../inner_dirichlet_partitioner.py | 26 +++++++++---------- .../partitioner/shard_partitioner.py | 8 +++--- .../partitioner/size_partitioner.py | 20 +++++++------- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index b12d8770932f..3e083349a1a8 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -315,11 +315,11 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab # In place shuffling self._rng.shuffle(indices) self._partition_id_to_indices = partition_id_to_indices - self._node_id_to_indices_determined = True + self._partition_id_to_indices_determined = True def _check_num_partitions_correctness_if_needed(self) -> None: """Test num_partitions when the dataset is given (in load_partition).""" - if not self._node_id_to_indices_determined: + if not self._partition_id_to_indices_determined: if self._num_partitions > self.dataset.num_rows: raise ValueError( "The number of partitions needs to be smaller than the number of " diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py index 6c53e4ecece2..0bf07acac688 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py @@ -219,13 +219,13 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab # Choose a node current_partition_id = self._rng.choice(not_full_partition_ids) # If current node is full resample a client - if node_id_to_left_to_allocate[current_node_id] == 0: + if partition_id_to_left_to_allocate[current_partition_id] == 0: # When the node is full, exclude it from the sampling nodes list - not_full_node_ids.pop(not_full_node_ids.index(current_node_id)) + not_full_partition_ids.pop(not_full_partition_ids.index(current_partition_id)) continue - node_id_to_left_to_allocate[current_node_id] -= 1 + partition_id_to_left_to_allocate[current_partition_id] -= 1 # Access the label distribution of the chosen client - current_probabilities = class_priors[current_node_id] + current_probabilities = class_priors[current_partition_id] while True: # curr_class = np.argmax(np.random.uniform() <= curr_prior) curr_class = self._rng.choice( @@ -240,32 +240,32 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab row_sums = class_priors.sum(axis=1, keepdims=True) class_priors = class_priors / row_sums # Adjust the current_probabilities (it won't sum up to 1 otherwise) - current_probabilities = class_priors[current_node_id] + current_probabilities = class_priors[current_partition_id] continue class_sizes[curr_class] -= 1 # Store sample index at the empty array cell - index = node_id_to_left_to_allocate[current_node_id] - client_indices[current_node_id][index] = idx_list[curr_class][ + index = partition_id_to_left_to_allocate[current_partition_id] + client_indices[current_partition_id][index] = idx_list[curr_class][ class_sizes[curr_class] ] break - node_id_to_indices = { + partition_id_to_indices = { cid: client_indices[cid].tolist() for cid in range(self._num_partitions) } # Shuffle the indices if the shuffle is True. # Note that the samples from this partitioning do not necessarily require # shuffling, the order should exhibit consecutive samples. if self._shuffle: - for indices in node_id_to_indices.values(): + for indices in partition_id_to_indices.values(): # In place shuffling self._rng.shuffle(indices) - self._node_id_to_indices = node_id_to_indices - self._node_id_to_indices_determined = True + self._partition_id_to_indices = partition_id_to_indices + self._partition_id_to_indices_determined = True def _check_num_partitions_correctness_if_needed(self) -> None: """Test num_partitions when the dataset is given (in load_partition).""" - if not self._node_id_to_indices_determined: + if not self._partition_id_to_indices_determined: if self._num_partitions > self.dataset.num_rows: raise ValueError( "The number of partitions needs to be smaller or equal to " @@ -274,7 +274,7 @@ def _check_num_partitions_correctness_if_needed(self) -> None: def _check_partition_sizes_correctness_if_needed(self) -> None: """Test partition_sizes when the dataset is given (in load_partition).""" - if not self._node_id_to_indices_determined: + if not self._partition_id_to_indices_determined: if sum(self._partition_sizes) > self.dataset.num_rows: raise ValueError( "The sum of the `partition_sizes` needs to be smaller or equal to " diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 68a6084ff38b..514e5ad64567 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -305,12 +305,12 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab for indices in partition_id_to_indices.values(): # In place shuffling self._rng.shuffle(indices) - self._partition_id_to_indices = node_id_to_indices - self._node_id_to_indices_determined = True + self._partition_id_to_indices = partition_id_to_indices + self._partition_id_to_indices_determined = True def _check_num_partitions_correctness_if_needed(self) -> None: """Test num_partitions when the dataset is given (in load_partition).""" - if not self._node_id_to_indices_determined: + if not self._partition_id_to_indices_determined: if self._num_partitions > self.dataset.num_rows: raise ValueError( "The number of partitions needs to be smaller than the number of " @@ -323,7 +323,7 @@ def _sort_dataset_if_needed(self) -> None: Operation only needed to be performed one time. It's required for the creation of shards with the same labels. """ - if self._node_id_to_indices_determined: + if self._partition_id_to_indices_determined: return self._dataset = self.dataset.sort(self._partition_by) diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py index e4c7b9bc6691..7e1836ca14f5 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -118,25 +118,25 @@ def _determine_partition_id_to_size(self) -> None: # If there is any sample(s) left unassigned, assign it to the largest partition. partition_sizes_as_num_of_samples[-1] += left_unassigned_samples for idx, partition_size in enumerate(partition_sizes_as_num_of_samples): - self._node_id_to_size[idx] = partition_size + self._partition_id_to_size[idx] = partition_size - self._check_if_node_id_to_size_possible() + self._check_if_partition_id_to_size_possible() - def _determine_node_id_to_indices_if_needed(self) -> None: + def _determine_partition_id_to_indices_if_needed(self) -> None: """Create an assignment of indices to the partition indices..""" - if self._node_id_to_indices_determined is True: + if self._partition_id_to_indices_determined is True: return - self._determine_node_id_to_size() + self._determine_partition_id_to_size() total_samples_assigned = 0 - for idx, quantity in self._node_id_to_size.items(): - self._node_id_to_indices[idx] = list( + for idx, quantity in self._partition_id_to_size.items(): + self._partition_id_to_indices[idx] = list( range(total_samples_assigned, total_samples_assigned + quantity) ) total_samples_assigned += quantity - self._node_id_to_indices_determined = True + self._partition_id_to_indices_determined = True - def _check_if_node_id_to_size_possible(self) -> None: - all_positive = all(value >= 1 for value in self.node_id_to_size.values()) + def _check_if_partition_id_to_size_possible(self) -> None: + all_positive = all(value >= 1 for value in self.partition_id_to_size.values()) if not all_positive: raise ValueError( f"The given specification of the parameter num_partitions" From 7ac506a871d45a6c42b093132b94d1e6d5aeb193 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 17:54:24 +0000 Subject: [PATCH 5/9] Rename node_id to partition_id --- .../partitioner/dirichlet_partitioner.py | 8 ++- .../partitioner/dirichlet_partitioner_test.py | 4 +- .../inner_dirichlet_partitioner.py | 8 ++- .../partitioner/linear_partitioner.py | 4 +- .../partitioner/natural_id_partitioner.py | 3 +- .../natural_id_partitioner_test.py | 8 ++- .../partitioner/shard_partitioner.py | 22 ++++--- .../partitioner/shard_partitioner_test.py | 64 +++++++++---------- .../partitioner/size_partitioner.py | 4 +- .../partitioner/square_partitioner.py | 4 +- 10 files changed, 76 insertions(+), 53 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index 3e083349a1a8..ca0f5a207824 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -194,7 +194,9 @@ def _initialize_alpha( ) return alpha - def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed( + self, + ) -> None: # pylint: disable=R0914 """Create an assignment of indices to the partition indices.""" if self._partition_id_to_indices_determined: return @@ -275,7 +277,9 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab ) if min_sample_size_on_client >= self._min_partition_size: break - sample_sizes = [len(indices) for indices in partition_id_to_indices.values()] + sample_sizes = [ + len(indices) for indices in partition_id_to_indices.values() + ] alpha_not_met = [ self._alpha[i] for i, ss in enumerate(sample_sizes) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py index 4e9ace1532b8..b2407b5d5822 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner_test.py @@ -77,7 +77,9 @@ def test_valid_initialization( def test_min_partition_size_requirement(self) -> None: """Test if partitions are created with min partition size required.""" _, partitioner = _dummy_setup(3, 0.5, 100, "labels") - partition_list = [partitioner.load_partition(partition_id) for partition_id in [0, 1, 2]] + partition_list = [ + partitioner.load_partition(partition_id) for partition_id in [0, 1, 2] + ] self.assertTrue( all(len(p) > partitioner._min_partition_size for p in partition_list) ) diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py index 0bf07acac688..1f9247881194 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py @@ -190,7 +190,9 @@ def _initialize_alpha_if_needed( ) return alpha - def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed( + self, + ) -> None: # pylint: disable=R0914 """Create an assignment of indices to the partition indices.""" if self._partition_id_to_indices_determined: return @@ -221,7 +223,9 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab # If current node is full resample a client if partition_id_to_left_to_allocate[current_partition_id] == 0: # When the node is full, exclude it from the sampling nodes list - not_full_partition_ids.pop(not_full_partition_ids.index(current_partition_id)) + not_full_partition_ids.pop( + not_full_partition_ids.index(current_partition_id) + ) continue partition_id_to_left_to_allocate[current_partition_id] -= 1 # Access the label distribution of the chosen client diff --git a/datasets/flwr_datasets/partitioner/linear_partitioner.py b/datasets/flwr_datasets/partitioner/linear_partitioner.py index 5871440dad5e..84d419ab5592 100644 --- a/datasets/flwr_datasets/partitioner/linear_partitioner.py +++ b/datasets/flwr_datasets/partitioner/linear_partitioner.py @@ -32,6 +32,8 @@ class LinearPartitioner(SizePartitioner): """ def __init__(self, num_partitions: int) -> None: - super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=lambda x: x) + super().__init__( + num_partitions=num_partitions, partition_id_to_size_fn=lambda x: x + ) if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") diff --git a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py index 874a251585d0..8bad0668595b 100644 --- a/datasets/flwr_datasets/partitioner/natural_id_partitioner.py +++ b/datasets/flwr_datasets/partitioner/natural_id_partitioner.py @@ -62,7 +62,8 @@ def load_partition(self, partition_id: int) -> datasets.Dataset: self._create_int_partition_id_to_natural_id() return self.dataset.filter( - lambda row: row[self._partition_by] == self._partition_id_to_natural_id[partition_id] + lambda row: row[self._partition_by] + == self._partition_id_to_natural_id[partition_id] ) @property diff --git a/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py b/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py index 1677b9f00be9..f447634ad9ed 100644 --- a/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/natural_id_partitioner_test.py @@ -67,7 +67,9 @@ def test_load_partition_num_partitions( _, partitioner = _dummy_setup(num_rows, num_unique_natural_id) # Simulate usage to start lazy partition_id_to_natural_id creation _ = partitioner.load_partition(0) - self.assertEqual(len(partitioner.partition_id_to_natural_id), num_unique_natural_id) + self.assertEqual( + len(partitioner.partition_id_to_natural_id), num_unique_natural_id + ) @parameterized.expand( # type: ignore # num_rows, num_unique_natural_ids @@ -106,7 +108,9 @@ def test_correct_number_of_partitions( """Test if the # of available partitions is equal to # of unique clients.""" _, partitioner = _dummy_setup(num_rows, num_unique_natural_ids) _ = partitioner.load_partition(partition_id=0) - self.assertEqual(len(partitioner.partition_id_to_natural_id), num_unique_natural_ids) + self.assertEqual( + len(partitioner.partition_id_to_natural_id), num_unique_natural_ids + ) def test_cannot_set_partition_id_to_natural_id(self) -> None: """Test the lack of ability to set partition_id_to_natural_id.""" diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 514e5ad64567..85698e00f563 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -68,7 +68,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 The total number of partitions that the data will be divided into. partition_by : str Column name of the labels (targets) based on which Dirichlet sampling works. - num_shards_per_node : Optional[int] + num_shards_per_partition : Optional[int] Number of shards to assign to a single partitioner. It's an alternative to `num_partitions`. shard_size : Optional[int] @@ -94,7 +94,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 >>> from flwr_datasets.partitioner import ShardPartitioner >>> >>> partitioner = ShardPartitioner(num_partitions=10, partition_by="label", - >>> num_shards_per_node=2, shard_size=1_000) + >>> num_shards_per_partition=2, shard_size=1_000) >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": partitioner}) >>> partition = fds.load_partition(0) >>> print(partition[0]) # Print the first example @@ -138,7 +138,7 @@ def __init__( # pylint: disable=R0913 self, num_partitions: int, partition_by: str, - num_shards_per_node: Optional[int] = None, + num_shards_per_partition: Optional[int] = None, shard_size: Optional[int] = None, keep_incomplete_shard: bool = False, shuffle: bool = True, @@ -149,8 +149,8 @@ def __init__( # pylint: disable=R0913 _check_if_natual_number(num_partitions, "num_partitions") self._num_partitions = num_partitions self._partition_by = partition_by - _check_if_natual_number(num_shards_per_node, "num_shards_per_node", True) - self._num_shards_per_node = num_shards_per_node + _check_if_natual_number(num_shards_per_partition, "num_shards_per_partition", True) + self._num_shards_per_node = num_shards_per_partition self._num_shards_used: Optional[int] = None _check_if_natual_number(shard_size, "shard_size", True) self._shard_size = shard_size @@ -194,7 +194,9 @@ def num_partitions(self) -> int: self._determine_partition_id_to_indices_if_needed() return self._num_partitions - def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disable=R0914 + def _determine_partition_id_to_indices_if_needed( + self, + ) -> None: # pylint: disable=R0914 """Assign sample indices to each node id. This method works on sorted datasets. A "shard" is a part of the dataset of @@ -205,7 +207,7 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab if self._partition_id_to_indices_determined: return - # One of the specification allows to skip the `num_shards_per_node` param + # One of the specification allows to skip the `num_shards_per_partition` param if self._num_shards_per_node is not None: self._num_shards_used = int( self._num_partitions * self._num_shards_per_node @@ -232,7 +234,7 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab if self._shard_size is None: raise ValueError( "The shard_size needs to be specified if the " - "num_shards_per_node is None" + "num_shards_per_partition is None" ) if self._keep_incomplete_shard is False: self._num_shards_used = int( @@ -300,7 +302,9 @@ def _determine_partition_id_to_indices_if_needed(self) -> None: # pylint: disab for shard_idx in nid_to_shard_indices[partition_id]: start_id = int(shard_idx * self._shard_size) end_id = min(int((shard_idx + 1) * self._shard_size), len(self.dataset)) - partition_id_to_indices[partition_id].extend(list(range(start_id, end_id))) + partition_id_to_indices[partition_id].extend( + list(range(start_id, end_id)) + ) if self._shuffle: for indices in partition_id_to_indices.values(): # In place shuffling diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner_test.py b/datasets/flwr_datasets/partitioner/shard_partitioner_test.py index 62b21fe640c3..d6fa8b529595 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner_test.py @@ -27,7 +27,7 @@ def _dummy_setup( num_rows: int, partition_by: str, num_partitions: int, - num_shards_per_node: Optional[int], + num_shards_per_partition: Optional[int], shard_size: Optional[int], keep_incomplete_shard: bool = False, ) -> Tuple[Dataset, ShardPartitioner]: @@ -39,7 +39,7 @@ def _dummy_setup( dataset = Dataset.from_dict(data) partitioner = ShardPartitioner( num_partitions=num_partitions, - num_shards_per_node=num_shards_per_node, + num_shards_per_partition=num_shards_per_partition, partition_by=partition_by, shard_size=shard_size, keep_incomplete_shard=keep_incomplete_shard, @@ -51,7 +51,7 @@ def _dummy_setup( class TestShardPartitionerSpec1(unittest.TestCase): """Test first possible initialization of ShardPartitioner. - Specify num_shards_per_node and shard_size arguments. + Specify num_shards_per_partition and shard_size arguments. """ def test_correct_num_partitions(self) -> None: @@ -59,14 +59,14 @@ def test_correct_num_partitions(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -79,14 +79,14 @@ def test_correct_partition_sizes(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -102,14 +102,14 @@ def test_unique_samples(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -133,14 +133,14 @@ def test_correct_num_partitions(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -153,14 +153,14 @@ def test_correct_partition_sizes(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -176,14 +176,14 @@ def test_unique_samples(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -207,14 +207,14 @@ def test_correct_num_partitions(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = True _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -227,14 +227,14 @@ def test_correct_partition_sizes(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = True _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -250,14 +250,14 @@ def test_unique_samples(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = True _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -272,7 +272,7 @@ def test_unique_samples(self) -> None: class TestShardPartitionerSpec4(unittest.TestCase): """Test fourth possible initialization of ShardPartitioner. - Specify num_shards_per_node but not shard_size arguments. + Specify num_shards_per_partition but not shard_size arguments. """ def test_correct_num_partitions(self) -> None: @@ -280,14 +280,14 @@ def test_correct_num_partitions(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = None keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -300,14 +300,14 @@ def test_correct_partition_sizes(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = None keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -323,14 +323,14 @@ def test_unique_samples(self) -> None: partition_by = "label" num_rows = 113 num_partitions = 3 - num_shards_per_node = 3 + num_shards_per_partition = 3 shard_size = None keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -354,14 +354,14 @@ def test_incorrect_specification(self) -> None: partition_by = "label" num_rows = 10 num_partitions = 3 - num_shards_per_node = 2 + num_shards_per_partition = 2 shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) @@ -373,14 +373,14 @@ def test_too_big_shard_size(self) -> None: partition_by = "label" num_rows = 20 num_partitions = 3 - num_shards_per_node = None + num_shards_per_partition = None shard_size = 10 keep_incomplete_shard = False _, partitioner = _dummy_setup( num_rows, partition_by, num_partitions, - num_shards_per_node, + num_shards_per_partition, shard_size, keep_incomplete_shard, ) diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py index 7e1836ca14f5..1c46bd0e6350 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -33,8 +33,8 @@ class SizePartitioner(Partitioner): If the function doesn't transform the `partition_id` it's a linear correlation between the number of sample for the node and the value of `partition_id`. For - instance, if the node ids range from 1 to M, node with id 1 gets 1 unit of data, - client 2 gets 2 units, and so on, up to node M which gets M units. + instance, if the partition ids range from 1 to M, partition with id 1 gets 1 unit of + data, client 2 gets 2 units, and so on, up to node M which gets M units. Note that size corresponding to the `partition_id` is deterministic, yet in case of different dataset shuffling the assignment of samples to `partition_id` will vary. diff --git a/datasets/flwr_datasets/partitioner/square_partitioner.py b/datasets/flwr_datasets/partitioner/square_partitioner.py index 7ff2838d18e6..4c894e47eedf 100644 --- a/datasets/flwr_datasets/partitioner/square_partitioner.py +++ b/datasets/flwr_datasets/partitioner/square_partitioner.py @@ -34,6 +34,8 @@ class SquarePartitioner(SizePartitioner): """ def __init__(self, num_partitions: int) -> None: - super().__init__(num_partitions=num_partitions, partition_id_to_size_fn=np.square) + super().__init__( + num_partitions=num_partitions, partition_id_to_size_fn=np.square + ) if num_partitions <= 0: raise ValueError("The number of partitions must be greater than zero.") From d374bc4c63d95b00f0067a43d2ea18fc57f6237d Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 17:58:58 +0000 Subject: [PATCH 6/9] Rename any node to partition --- .../partitioner/dirichlet_partitioner.py | 22 +++---- .../inner_dirichlet_partitioner.py | 11 ++-- .../partitioner/shard_partitioner.py | 58 +++++++++---------- .../partitioner/size_partitioner.py | 6 +- 4 files changed, 48 insertions(+), 49 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index ca0f5a207824..af541666520a 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -39,10 +39,10 @@ class DirichletPartitioner(Partitioner): even though the alpha stays the same). The notion of balancing is explicitly introduced here (not mentioned in paper but - implemented in the code). It is a mechanism that excludes the node from - assigning new samples to it if the current number of samples on that node exceeds - the average number that the node would get in case of even data distribution. - It is controlled by`self_balancing` parameter. + implemented in the code). It is a mechanism that excludes the partition from + assigning new samples to it if the current number of samples on that partition + exceeds the average number that the partition would get in case of even data + distribution. It is controlled by`self_balancing` parameter. Parameters ---------- @@ -61,7 +61,7 @@ class DirichletPartitioner(Partitioner): paper's code although not mentioned in paper itself). shuffle: bool Whether to randomize the order of samples. Shuffling applied after the - samples assignment to nodes. + samples assignment to partitions. seed: int Seed used for dataset shuffling. It has no effect if `shuffle` is False. @@ -109,7 +109,7 @@ def __init__( # pylint: disable=R0913 # Utility attributes # The attributes below are determined during the first call to load_partition - self._avg_num_of_samples_per_node: Optional[float] = None + self._avg_num_of_samples_per_partition: Optional[float] = None self._unique_classes: Optional[Union[List[int], List[str]]] = None self._partition_id_to_indices: Dict[int, List[int]] = {} self._partition_id_to_indices_determined = False @@ -205,7 +205,7 @@ def _determine_partition_id_to_indices_if_needed( self._unique_classes = self.dataset.unique(self._partition_by) assert self._unique_classes is not None # This is needed only if self._self_balancing is True (the default option) - self._avg_num_of_samples_per_node = self.dataset.num_rows / self._num_partitions + self._avg_num_of_samples_per_partition = self.dataset.num_rows / self._num_partitions # Change targets list data type to numpy targets = np.array(self.dataset[self._partition_by]) @@ -214,7 +214,7 @@ def _determine_partition_id_to_indices_if_needed( # min_partition_size is reached. sampling_try = 0 while True: - # Prepare data structure to store indices assigned to node ids + # Prepare data structure to store indices assigned to partition ids partition_id_to_indices: Dict[int, List[int]] = {} for nid in range(self._num_partitions): partition_id_to_indices[nid] = [] @@ -232,16 +232,16 @@ def _determine_partition_id_to_indices_if_needed( nid ] # Balancing (not mentioned in the paper but implemented) - # Do not assign additional samples to the node if it already has more + # Do not assign additional samples to the partition if it already has more # than the average numbers of samples per partition. Note that it might # especially affect classes that are later in the order. This is the # reason for more sparse division that the alpha might suggest. if self._self_balancing: - assert self._avg_num_of_samples_per_node is not None + assert self._avg_num_of_samples_per_partition is not None for nid in nid_to_proportion_of_k_samples.copy(): if ( len(partition_id_to_indices[nid]) - > self._avg_num_of_samples_per_node + > self._avg_num_of_samples_per_partition ): nid_to_proportion_of_k_samples[nid] = 0 diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py index 1f9247881194..210c354a9b28 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py @@ -49,7 +49,7 @@ class InnerDirichletPartitioner(Partitioner): # pylint: disable=R0902 number of unique classes) shuffle: bool Whether to randomize the order of samples. Shuffling applied after the - samples assignment to nodes. + samples assignment to partitions. seed: int Seed used for dataset shuffling. It has no effect if `shuffle` is False. @@ -91,7 +91,6 @@ def __init__( # pylint: disable=R0913 self._num_unique_classes: Optional[int] = None self._num_partitions = len(self._partition_sizes) - # self._avg_num_of_samples_per_node: Optional[float] = None self._partition_id_to_indices: Dict[int, List[int]] = {} self._partition_id_to_indices_determined = False @@ -211,18 +210,18 @@ def _determine_partition_id_to_indices_if_needed( for cid in range(self._num_partitions) ] - # Node id to number of sample left for allocation for that node id + # Node id to number of sample left for allocation for that partition id partition_id_to_left_to_allocate = dict( zip(range(self._num_partitions), self._partition_sizes) ) not_full_partition_ids = list(range(self._num_partitions)) while np.sum(list(partition_id_to_left_to_allocate.values())) != 0: - # Choose a node + # Choose a partition current_partition_id = self._rng.choice(not_full_partition_ids) - # If current node is full resample a client + # If current partition is full resample a client if partition_id_to_left_to_allocate[current_partition_id] == 0: - # When the node is full, exclude it from the sampling nodes list + # When the partition is full, exclude it from the sampling partitions list not_full_partition_ids.pop( not_full_partition_ids.index(current_partition_id) ) diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 85698e00f563..797b68add29c 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -31,7 +31,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 The algorithm works as follows: the dataset is sorted by label e.g. [samples with label 1, samples with labels 2 ...], then the shards are created, with each shard of size = `shard_size` if provided or automatically calculated: - shards_size = len(dataset) / `num_partitions` * `num_shards_per_node`. + shards_size = len(dataset) / `num_partitions` * `num_shards_per_partition`. A shard is just a block (chunk) of a `dataset` that contains `shard_size` consecutive samples. There might be shards that contain samples associated with more @@ -42,17 +42,17 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 has samples with more than one unique label is when the shard size is bigger than the number of samples of a certain class. - Each partition is created from `num_shards_per_node` that are chosen randomly. + Each partition is created from `num_shards_per_partition` that are chosen randomly. There are a few ways of partitioning data that result in certain properties (depending on the parameters specification): - 1) same number of shards per nodes + the same shard size (specify: - a) `num_shards_per_nodes`, `shard_size`; or b) `num_shards_per_node`) + 1) same number of shards per partitions + the same shard size (specify: + a) `num_shards_per_partitions`, `shard_size`; or b) `num_shards_per_partition`) In case of b the `shard_size` is calculated as floor(len(dataset) / - (`num_shards_per_nodes` * `num_partitions`)) - 2) possibly different number of shards per node (use nearly all data) + the same + (`num_shards_per_partitions` * `num_partitions`)) + 2) possibly different number of shards per partition (use nearly all data) + the same shard size (specify: `shard_size` + `keep_incomplete_shard=False`) - 3) possibly different number of shards per node (use all data) + possibly different + 3) possibly different number of shards per partition (use all data) + possibly different shard size (specify: `shard_size` + `keep_incomplete_shard=True`) @@ -79,16 +79,16 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 others). If it is dropped each shard is equal size. (It does not mean that each client gets equal number of shards, which only happens if `num_partitions` % `num_shards` = 0). This parameter has no effect if - `num_shards_per_nodes` and `shard_size` are specified. + `num_shards_per_partitions` and `shard_size` are specified. shuffle: bool Whether to randomize the order of samples. Shuffling applied after the - samples assignment to nodes. + samples assignment to partitions. seed: int Seed used for dataset shuffling. It has no effect if `shuffle` is False. Examples -------- - 1) If you need same number of shards per nodes + the same shard size (and you know + 1) If you need same number of shards per partitions + the same shard size (and you know both of these values) >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.partitioner import ShardPartitioner @@ -107,7 +107,7 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 [2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000] 2) If you want to use nearly all the data and do not need to have the number of - shard per each node to be the same + shard per each partition to be the same >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.partitioner import ShardPartitioner >>> @@ -150,7 +150,7 @@ def __init__( # pylint: disable=R0913 self._num_partitions = num_partitions self._partition_by = partition_by _check_if_natual_number(num_shards_per_partition, "num_shards_per_partition", True) - self._num_shards_per_node = num_shards_per_partition + self._num_shards_per_partition = num_shards_per_partition self._num_shards_used: Optional[int] = None _check_if_natual_number(shard_size, "shard_size", True) self._shard_size = shard_size @@ -197,7 +197,7 @@ def num_partitions(self) -> int: def _determine_partition_id_to_indices_if_needed( self, ) -> None: # pylint: disable=R0914 - """Assign sample indices to each node id. + """Assign sample indices to each partition id. This method works on sorted datasets. A "shard" is a part of the dataset of consecutive samples (if self._keep_incomplete_shard is False, each shard is same @@ -208,12 +208,12 @@ def _determine_partition_id_to_indices_if_needed( return # One of the specification allows to skip the `num_shards_per_partition` param - if self._num_shards_per_node is not None: + if self._num_shards_per_partition is not None: self._num_shards_used = int( - self._num_partitions * self._num_shards_per_node + self._num_partitions * self._num_shards_per_partition ) - num_shards_per_node_array = ( - np.ones(self._num_partitions) * self._num_shards_per_node + num_shards_per_partition_array = ( + np.ones(self._num_partitions) * self._num_shards_per_partition ) if self._shard_size is None: self._compute_shard_size_if_missing() @@ -230,7 +230,7 @@ def _determine_partition_id_to_indices_if_needed( num_usable_shards_in_dataset = int( math.floor(len(self.dataset) / self._shard_size) ) - elif self._num_shards_per_node is None: + elif self._num_shards_per_partition is None: if self._shard_size is None: raise ValueError( "The shard_size needs to be specified if the " @@ -257,22 +257,22 @@ def _determine_partition_id_to_indices_if_needed( else: raise ValueError( "The keep_incomplete_shards need to be specified " - "when _num_shards_per_node is None." + "when _num_shards_per_partition is None." ) - num_shards_per_node = int(self._num_shards_used / self._num_partitions) - # Assign the shards per nodes (so far, the same as in ideal case) - num_shards_per_node_array = ( - np.ones(self._num_partitions) * num_shards_per_node + num_shards_per_partition = int(self._num_shards_used / self._num_partitions) + # Assign the shards per partitions (so far, the same as in ideal case) + num_shards_per_partition_array = ( + np.ones(self._num_partitions) * num_shards_per_partition ) - num_shards_assigned = self._num_partitions * num_shards_per_node + num_shards_assigned = self._num_partitions * num_shards_per_partition num_shards_to_assign = self._num_shards_used - num_shards_assigned # Assign the "missing" shards for i in range(num_shards_to_assign): - num_shards_per_node_array[i] += 1 + num_shards_per_partition_array[i] += 1 else: raise ValueError( - "The specification of nm_shards_per_node and " + "The specification of nm_shards_per_partition and " "keep_incomplete_shards is not correct." ) @@ -284,7 +284,7 @@ def _determine_partition_id_to_indices_if_needed( ) indices_on_which_to_split_shards = np.cumsum( - num_shards_per_node_array, dtype=int + num_shards_per_partition_array, dtype=int ) shard_indices_array = self._rng.permutation(num_usable_shards_in_dataset)[ @@ -342,9 +342,9 @@ def _compute_shard_size_if_missing(self) -> None: self._shard_size = int(num_rows / self._num_shards_used) def _check_possibility_of_partitions_creation(self) -> None: - if self._shard_size is not None and self._num_shards_per_node is not None: + if self._shard_size is not None and self._num_shards_per_partition is not None: implied_min_dataset_size = ( - self._shard_size * self._num_shards_per_node * self._num_partitions + self._shard_size * self._num_shards_per_partition * self._num_partitions ) if implied_min_dataset_size > len(self.dataset): raise ValueError( diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py index 1c46bd0e6350..35937d8b9cc7 100644 --- a/datasets/flwr_datasets/partitioner/size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -32,9 +32,9 @@ class SizePartitioner(Partitioner): `partition_id_to_size_fn(partition_id)` ~ number of samples for `partition_id` If the function doesn't transform the `partition_id` it's a linear correlation - between the number of sample for the node and the value of `partition_id`. For + between the number of sample for the partition and the value of `partition_id`. For instance, if the partition ids range from 1 to M, partition with id 1 gets 1 unit of - data, client 2 gets 2 units, and so on, up to node M which gets M units. + data, client 2 gets 2 units, and so on, up to partition M which gets M units. Note that size corresponding to the `partition_id` is deterministic, yet in case of different dataset shuffling the assignment of samples to `partition_id` will vary. @@ -44,7 +44,7 @@ class SizePartitioner(Partitioner): num_partitions : int The total number of partitions that the data will be divided into. partition_id_to_size_fn : Callable - Function that defines the relationship between node id and the number of + Function that defines the relationship between partition id and the number of samples. """ From a10f40d084e322c468ada15c5feb6e81404e83a5 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 18:01:02 +0000 Subject: [PATCH 7/9] Fix formatting --- .../partitioner/dirichlet_partitioner.py | 14 ++++++++------ .../partitioner/inner_dirichlet_partitioner.py | 2 +- .../partitioner/shard_partitioner.py | 16 +++++++++------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index af541666520a..25d186fdadd1 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -41,7 +41,7 @@ class DirichletPartitioner(Partitioner): The notion of balancing is explicitly introduced here (not mentioned in paper but implemented in the code). It is a mechanism that excludes the partition from assigning new samples to it if the current number of samples on that partition - exceeds the average number that the partition would get in case of even data + exceeds the average number that the partition would get in case of even data distribution. It is controlled by`self_balancing` parameter. Parameters @@ -205,7 +205,9 @@ def _determine_partition_id_to_indices_if_needed( self._unique_classes = self.dataset.unique(self._partition_by) assert self._unique_classes is not None # This is needed only if self._self_balancing is True (the default option) - self._avg_num_of_samples_per_partition = self.dataset.num_rows / self._num_partitions + self._avg_num_of_samples_per_partition = ( + self.dataset.num_rows / self._num_partitions + ) # Change targets list data type to numpy targets = np.array(self.dataset[self._partition_by]) @@ -232,10 +234,10 @@ def _determine_partition_id_to_indices_if_needed( nid ] # Balancing (not mentioned in the paper but implemented) - # Do not assign additional samples to the partition if it already has more - # than the average numbers of samples per partition. Note that it might - # especially affect classes that are later in the order. This is the - # reason for more sparse division that the alpha might suggest. + # Do not assign additional samples to the partition if it already has + # more than the average numbers of samples per partition. Note that it + # might especially affect classes that are later in the order. This is + # the reason for more sparse division that the alpha might suggest. if self._self_balancing: assert self._avg_num_of_samples_per_partition is not None for nid in nid_to_proportion_of_k_samples.copy(): diff --git a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py index 210c354a9b28..e3e46813dfc8 100644 --- a/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/inner_dirichlet_partitioner.py @@ -221,7 +221,7 @@ def _determine_partition_id_to_indices_if_needed( current_partition_id = self._rng.choice(not_full_partition_ids) # If current partition is full resample a client if partition_id_to_left_to_allocate[current_partition_id] == 0: - # When the partition is full, exclude it from the sampling partitions list + # When the partition is full, exclude it from the sampling list not_full_partition_ids.pop( not_full_partition_ids.index(current_partition_id) ) diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 797b68add29c..22a6abb7f47c 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -50,10 +50,10 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 a) `num_shards_per_partitions`, `shard_size`; or b) `num_shards_per_partition`) In case of b the `shard_size` is calculated as floor(len(dataset) / (`num_shards_per_partitions` * `num_partitions`)) - 2) possibly different number of shards per partition (use nearly all data) + the same - shard size (specify: `shard_size` + `keep_incomplete_shard=False`) - 3) possibly different number of shards per partition (use all data) + possibly different - shard size (specify: `shard_size` + `keep_incomplete_shard=True`) + 2) possibly different number of shards per partition (use nearly all data) + the + same shard size (specify: `shard_size` + `keep_incomplete_shard=False`) + 3) possibly different number of shards per partition (use all data) + possibly + different shard size (specify: `shard_size` + `keep_incomplete_shard=True`) Algorithm based on the description in Communication-Efficient Learning of Deep @@ -88,8 +88,8 @@ class ShardPartitioner(Partitioner): # pylint: disable=R0902 Examples -------- - 1) If you need same number of shards per partitions + the same shard size (and you know - both of these values) + 1) If you need same number of shards per partitions + the same shard size (and you + know both of these values) >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.partitioner import ShardPartitioner >>> @@ -149,7 +149,9 @@ def __init__( # pylint: disable=R0913 _check_if_natual_number(num_partitions, "num_partitions") self._num_partitions = num_partitions self._partition_by = partition_by - _check_if_natual_number(num_shards_per_partition, "num_shards_per_partition", True) + _check_if_natual_number( + num_shards_per_partition, "num_shards_per_partition", True + ) self._num_shards_per_partition = num_shards_per_partition self._num_shards_used: Optional[int] = None _check_if_natual_number(shard_size, "shard_size", True) From d33c9092e00fe1f9416c346ab1c1a272d90899af Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 13 Mar 2024 18:11:35 +0000 Subject: [PATCH 8/9] Replace R0914: Too many local variables --- datasets/flwr_datasets/partitioner/dirichlet_partitioner.py | 4 ++-- datasets/flwr_datasets/partitioner/shard_partitioner.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py index 25d186fdadd1..f3feb2174bde 100644 --- a/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py +++ b/datasets/flwr_datasets/partitioner/dirichlet_partitioner.py @@ -25,7 +25,7 @@ from flwr_datasets.partitioner.partitioner import Partitioner -# pylint: disable=R0902, R0912 +# pylint: disable=R0902, R0912, R0914 class DirichletPartitioner(Partitioner): """Partitioner based on Dirichlet distribution. @@ -196,7 +196,7 @@ def _initialize_alpha( def _determine_partition_id_to_indices_if_needed( self, - ) -> None: # pylint: disable=R0914 + ) -> None: """Create an assignment of indices to the partition indices.""" if self._partition_id_to_indices_determined: return diff --git a/datasets/flwr_datasets/partitioner/shard_partitioner.py b/datasets/flwr_datasets/partitioner/shard_partitioner.py index 22a6abb7f47c..a973f7e5bcb9 100644 --- a/datasets/flwr_datasets/partitioner/shard_partitioner.py +++ b/datasets/flwr_datasets/partitioner/shard_partitioner.py @@ -15,7 +15,7 @@ """Shard partitioner class.""" -# pylint: disable=R0912 +# pylint: disable=R0912, R0914 import math from typing import Dict, List, Optional @@ -198,7 +198,7 @@ def num_partitions(self) -> int: def _determine_partition_id_to_indices_if_needed( self, - ) -> None: # pylint: disable=R0914 + ) -> None: """Assign sample indices to each partition id. This method works on sorted datasets. A "shard" is a part of the dataset of From a119cbd76dc9c14a8b1bc96edc89b827b6612627 Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 13 Mar 2024 19:59:43 +0000 Subject: [PATCH 9/9] Update README.md --- datasets/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/README.md b/datasets/README.md index 61292fe988bf..df00f198f0a8 100644 --- a/datasets/README.md +++ b/datasets/README.md @@ -59,7 +59,7 @@ If you plan to change the type of the dataset to run the code with your ML frame # Usage -Flower Datasets exposes the `FederatedDataset` abstraction to represent the dataset needed for federated learning/evaluation/analytics. It has two powerful methods that let you handle the dataset preprocessing: `load_partition(node_id, split)` and `load_full(split)`. +Flower Datasets exposes the `FederatedDataset` abstraction to represent the dataset needed for federated learning/evaluation/analytics. It has two powerful methods that let you handle the dataset preprocessing: `load_partition(partition_id, split)` and `load_full(split)`. Here's a basic quickstart example of how to partition the MNIST dataset: