Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tuning for multiple columns part 3: Utility analysis for multiple aggregation #525

Merged
merged 13 commits into from
Sep 12, 2024
50 changes: 38 additions & 12 deletions analysis/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import copy
import dataclasses
from typing import Iterable, Optional, Sequence, Union
from typing import Iterable, List, Optional, Sequence, Tuple, Union

import pipeline_dp
from pipeline_dp import input_validators
Expand Down Expand Up @@ -65,19 +65,41 @@ def __post_init__(self):
raise ValueError(
"All set attributes in MultiParameterConfiguration must have "
"the same length.")
self._size = sizes[0]
if (self.min_sum_per_partition is None) != (self.max_sum_per_partition
is None):
raise ValueError(
"MultiParameterConfiguration: min_sum_per_partition and "
"max_sum_per_partition must be both set or both None.")
self._size = sizes[0]
if self.min_sum_per_partition:
# If elements of min_sum_per_partition and max_sum_per_partition are
# sequences, all of them should have the same size.
def all_elements_are_lists(a: list):
return all([isinstance(b, Sequence) for b in a])

def common_value_len(a: list) -> Optional[int]:
sizes = [len(v) for v in a]
return min(sizes) if min(sizes) == max(sizes) else None

if all_elements_are_lists(
self.min_sum_per_partition) and all_elements_are_lists(
self.max_sum_per_partition):
# multi-column case. Check that each configuration has the
# same number of elements (i.e. columns)
size1 = common_value_len(self.min_sum_per_partition)
size2 = common_value_len(self.max_sum_per_partition)
if size1 is None or size2 is None or size1 != size2:
raise ValueError("If elements of min_sum_per_partition and "
"max_sum_per_partition are sequences, then"
" they must have the same length.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace at the end of line like above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a multi-line string, which doesn't contain new-lines, so it doens't matter where to put spaces.


@property
def size(self):
return self._size

def get_aggregate_params(self, params: pipeline_dp.AggregateParams,
index: int) -> pipeline_dp.AggregateParams:
def get_aggregate_params(
self, params: pipeline_dp.AggregateParams, index: int
) -> Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]:
"""Returns AggregateParams with the index-th parameters."""
params = copy.copy(params)
if self.max_partitions_contributed:
Expand All @@ -86,16 +108,20 @@ def get_aggregate_params(self, params: pipeline_dp.AggregateParams,
if self.max_contributions_per_partition:
params.max_contributions_per_partition = self.max_contributions_per_partition[
index]
if self.min_sum_per_partition:
params.min_sum_per_partition = self.min_sum_per_partition[index]
if self.max_sum_per_partition:
params.max_sum_per_partition = self.max_sum_per_partition[index]
if self.noise_kind:
params.noise_kind = self.noise_kind[index]
if self.partition_selection_strategy:
params.partition_selection_strategy = self.partition_selection_strategy[
index]
return params
min_max_sum = []
if self.min_sum_per_partition:
min_sum = self.min_sum_per_partition[index]
max_sum = self.max_sum_per_partition[index]
if isinstance(min_sum, Sequence):
min_max_sum = list(zip(min_sum, max_sum))
else:
min_max_sum = [[min_sum, max_sum]]
return params, min_max_sum


@dataclasses.dataclass
Expand Down Expand Up @@ -124,12 +150,12 @@ def n_configurations(self):


def get_aggregate_params(
options: UtilityAnalysisOptions
) -> Iterable[pipeline_dp.AggregateParams]:
options: UtilityAnalysisOptions
) -> Iterable[Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]]:
"""Returns AggregateParams which are specified by UtilityAnalysisOptions."""
multi_param_configuration = options.multi_param_configuration
if multi_param_configuration is None:
yield options.aggregate_params
yield options.aggregate_params, []
else:
for i in range(multi_param_configuration.size):
yield multi_param_configuration.get_aggregate_params(
Expand Down
7 changes: 6 additions & 1 deletion analysis/parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ def _add_dp_strategy_to_multi_parameter_configuration(
noise_kind: Optional[pipeline_dp.NoiseKind],
strategy_selector: dp_strategy_selector.DPStrategySelector) -> None:
params = [
configuration.get_aggregate_params(blueprint_params, i)
# get_aggregate_params returns a tuple (AggregateParams,
# min_max_sum_per_partitions)
# for multi-columns. DP Strategy (i.e. noise_kind, partition_selection)
# is independent from min_max_sum_per_partitions, so it's fine just to
# AggregateParams with [0]
dvadym marked this conversation as resolved.
Show resolved Hide resolved
configuration.get_aggregate_params(blueprint_params, i)[0]
for i in range(configuration.size)
]
# Initialize fields corresponding to DP strategy configuration
Expand Down
31 changes: 25 additions & 6 deletions analysis/per_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import abc
import copy
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union
import numpy as np
import math

Expand All @@ -31,7 +31,7 @@

# It corresponds to the aggregating per (privacy_id, partition_key).
# (count, sum, num_partition_privacy_id_contributes).
PreaggregatedData = Tuple[int, float, int]
PreaggregatedData = Tuple[int, Union[float, Sequence[float]], int]


class UtilityAnalysisCombiner(pipeline_dp.Combiner):
Expand Down Expand Up @@ -236,15 +236,23 @@ class SumCombiner(UtilityAnalysisCombiner):
def __init__(self,
spec: budget_accounting.MechanismSpec,
params: pipeline_dp.AggregateParams,
metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM):
metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM,
i_column: Optional[int] = None):
self._spec = spec
self._params = copy.deepcopy(params)
self._params = params
self._metric = metric
self._i_column = i_column

def create_accumulator(
self, data: Tuple[np.ndarray, np.ndarray,
np.ndarray]) -> AccumulatorType:
count, partition_sum, n_partitions = data
if self._i_column is not None:
# When i_column is set, it means that this is a multi-column
# case and this combiner process i-th column. The partition_sum
dvadym marked this conversation as resolved.
Show resolved Hide resolved
# will be 2d np array: n_examples*n_columns
dvadym marked this conversation as resolved.
Show resolved Hide resolved
# extract corresponding column in case of multi-column case.
partition_sum = partition_sum[:, self._i_column]
del count # not used for SumCombiner
min_bound = self._params.min_sum_per_partition
max_bound = self._params.max_sum_per_partition
Expand Down Expand Up @@ -375,16 +383,27 @@ class CompoundCombiner(pipeline_dp.combiners.CompoundCombiner):
# improvements, on converting from sparse to dense mode, the data are
# converted to NumPy arrays. And internal combiners perform NumPy vector
# aggregations.
SparseAccumulatorType = Tuple[List[int], List[float], List[int]]
SparseAccumulatorType = Tuple[List[int], Union[List[float],
List[Sequence[float]]],
List[int]]
DenseAccumulatorType = List[Any]
AccumulatorType = Tuple[Optional[SparseAccumulatorType],
Optional[DenseAccumulatorType]]

def __init__(self, combiners: Iterable['Combiner'],
n_sum_aggregations: int):
super().__init__(combiners, return_named_tuple=False)
self._n_sum_aggregations = n_sum_aggregations

def create_accumulator(self, data: PreaggregatedData) -> AccumulatorType:
if not data:
# Handle empty partitions. Only encountered when public partitions
# are used.
return (([0], [0], [0]), None)
if self._n_sum_aggregations > 1:
empty_sum = [(0,) * self._n_sum_aggregations]
else:
empty_sum = [0]
return (([0], empty_sum, [0]), None)
return (([data[0]], [data[1]], [data[2]]), None)

def _to_dense(self,
Expand Down
23 changes: 21 additions & 2 deletions analysis/tests/data_structures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ class MultiParameterConfiguration(parameterized.TestCase):
max_sum_per_partition=None,
noise_kind=None,
partition_selection_strategy=None),
dict(testcase_name="min_sum_per_partition and max_sum_per_partition"
"have different length elements",
error_msg="If elements of min_sum_per_partition and "
"max_sum_per_partition are sequences, then they must "
"have the same length.",
max_partitions_contributed=None,
max_contributions_per_partition=None,
min_sum_per_partition=[(1, 2), (1,)],
max_sum_per_partition=[(3, 5), (2, 2)],
noise_kind=None,
partition_selection_strategy=None),
)
def test_validation(self, error_msg, max_partitions_contributed,
max_contributions_per_partition, min_sum_per_partition,
Expand All @@ -95,18 +106,26 @@ def test_get_aggregate_params(self):
selection_strategy = [
pipeline_dp.PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING
] * 3
max_sum_per_partition = [(1, 2), (3, 4), (5, 6)]
min_sum_per_partition = [(0, 0), (0, 0), (0, 1)]
multi_params = analysis.MultiParameterConfiguration(
max_partitions_contributed=max_partitions_contributed,
noise_kind=noise_kind,
partition_selection_strategy=selection_strategy)
partition_selection_strategy=selection_strategy,
min_sum_per_partition=min_sum_per_partition,
max_sum_per_partition=max_sum_per_partition)
self.assertTrue(3, multi_params.size)

expected_min_max_sum = [[(0, 1), (0, 2)], [(0, 3), (0, 4)],
[(0, 5), (1, 6)]]

for i in range(multi_params.size):
ith_params = multi_params.get_aggregate_params(params, i)
ith_params, min_max = multi_params.get_aggregate_params(params, i)
params.max_partitions_contributed = max_partitions_contributed[i]
params.noise_kind = noise_kind[i]
params.partition_selection_strategy = selection_strategy[i]
self.assertEqual(params, ith_params)
self.assertEqual(min_max, expected_min_max_sum[i])


if __name__ == '__main__':
Expand Down
65 changes: 59 additions & 6 deletions analysis/tests/per_partition_combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""UtilityAnalysisCountCombinerTest."""
import copy
import dataclasses

import numpy as np
Expand Down Expand Up @@ -58,7 +59,7 @@ def _create_sparse_combiner_acc(
return (counts, sums, n_partitions)


class UtilityAnalysisCountCombinerTest(parameterized.TestCase):
class CountCombinerTest(parameterized.TestCase):

@parameterized.named_parameters(
dict(testcase_name='empty',
Expand Down Expand Up @@ -273,7 +274,7 @@ def _create_combiner_params_for_sum(
))


class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
class SumCombinerTest(parameterized.TestCase):

@parameterized.named_parameters(
dict(testcase_name='empty',
Expand Down Expand Up @@ -366,6 +367,23 @@ def test_merge(self):
# Test that no type is np.float64
self.assertTrue(_check_none_are_np_float64(merged_acc))

def test_create_accumulator_for_multi_columns(self):
params = _create_combiner_params_for_sum(0, 5)
combiner = combiners.SumCombiner(*params, i_column=1)
data = (np.array([1, 1]), np.array([[1, 10],
[2, 20]]), np.array([100, 150]))
partition_sum, clipping_to_min_error, clipping_to_max_error, expected_l0_bounding_error, var_cross_partition_error = combiner.create_accumulator(
data)
self.assertEqual(partition_sum, 30)
self.assertEqual(clipping_to_min_error, 0)
self.assertEqual(clipping_to_max_error, -20)
self.assertAlmostEqual(expected_l0_bounding_error,
-9.91666667,
delta=1e-8)
self.assertAlmostEqual(var_cross_partition_error,
0.41305556,
delta=1e-8)


def _create_combiner_params_for_privacy_id_count() -> Tuple[
pipeline_dp.budget_accounting.MechanismSpec, pipeline_dp.AggregateParams]:
Expand All @@ -381,7 +399,7 @@ def _create_combiner_params_for_privacy_id_count() -> Tuple[
))


class UtilityAnalysisPrivacyIdCountCombinerTest(parameterized.TestCase):
class PrivacyIdCountCombinerTest(parameterized.TestCase):

@parameterized.named_parameters(
dict(testcase_name='empty',
Expand Down Expand Up @@ -463,15 +481,33 @@ def test_merge(self):
self.assertTrue(_check_none_are_np_float64(merged_acc))


class UtilityAnalysisCompoundCombinerTest(parameterized.TestCase):
class CompoundCombinerTest(parameterized.TestCase):

def _create_combiner(self) -> combiners.CompoundCombiner:
mechanism_spec, params = _create_combiner_params_for_count()
count_combiner = combiners.CountCombiner(mechanism_spec, params)
return combiners.CompoundCombiner([count_combiner],
return_named_tuple=False)
n_sum_aggregations=0)

def _create_combiner_2_columns(self) -> combiners.CompoundCombiner:
mechanism_spec, params1 = _create_combiner_params_for_sum(0, 1)
sum_combiner1 = combiners.SumCombiner(mechanism_spec,
params1,
i_column=0)
params2 = copy.deepcopy(params1)
params2.max_sum_per_partition = 5
sum_combiner2 = combiners.SumCombiner(mechanism_spec,
params2,
i_column=1)
return combiners.CompoundCombiner([sum_combiner1, sum_combiner2],
n_sum_aggregations=2)

def test_create_accumulator_empty_data(self):
sparse, dense = self._create_combiner_2_columns().create_accumulator(())
self.assertEqual(sparse, ([0], [(0, 0)], [0]))
self.assertIsNone(dense)

def test_create_accumulator_empty_data_multi_columns(self):
sparse, dense = self._create_combiner().create_accumulator(())
self.assertEqual(sparse, ([0], [0], [0]))
self.assertIsNone(dense)
Expand All @@ -485,6 +521,13 @@ def test_create_accumulator(self):
self.assertEqual(([len(data)], [sum(data)], [n_partitions]), sparse)
self.assertIsNone(dense)

def test_create_accumulator_2_sum_columns(self):
combiner = self._create_combiner_2_columns()
pre_aggregate_data = [1, [2, 3], 4] # count, sum, n_partitions
sparse, dense = combiner.create_accumulator(pre_aggregate_data)
self.assertEqual(([1], [[2, 3]], [4]), sparse)
self.assertIsNone(dense)

def test_to_dense(self):
combiner = self._create_combiner()
sparse_acc = ([1, 3], [10, 20], [100, 200])
Expand All @@ -493,6 +536,16 @@ def test_to_dense(self):
self.assertEqual(2, num_privacy_ids)
self.assertSequenceEqual((4, 0, -1.0, -2.98, 0.0298), count_acc)

def test_to_dense_2_columns(self):
combiner = self._create_combiner_2_columns()
sparse_acc = ([1, 3], [(10, 20), (100, 200)], [100, 200])
dense = combiner._to_dense(sparse_acc)
num_privacy_ids, (sum1_acc, sum2_acc) = dense
self.assertEqual(2, num_privacy_ids)
self.assertSequenceEqual(
(110, 0, -108, -1.9849999999999999, 0.014875000000000001), sum1_acc)
self.assertSequenceEqual((220, 0, -210, -9.925, 0.371875), sum2_acc)

def test_merge_sparse(self):
combiner = self._create_combiner()
sparse_acc1 = ([1], [10], [100])
Expand Down Expand Up @@ -611,7 +664,7 @@ def test_two_internal_combiners(self):
sum_mechanism_spec, sum_params = _create_combiner_params_for_sum(0, 5)
sum_combiner = combiners.SumCombiner(sum_mechanism_spec, sum_params)
combiner = combiners.CompoundCombiner([count_combiner, sum_combiner],
return_named_tuple=False)
n_sum_aggregations=1)

data, n_partitions = [1, 2, 3], 100
acc = combiner.create_accumulator((len(data), sum(data), n_partitions))
Expand Down
4 changes: 3 additions & 1 deletion analysis/utility_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def _pack_per_partition_metrics(
where each element corresponds to one of the configuration of the input
parameters.
"""
n_metrics = len(utility_result) // n_configurations
assert (len(utility_result) - 1) % n_configurations == 0
n_metrics = (len(utility_result) -
1) // n_configurations # -1 because of raw_statistics

raw_statistics = utility_result[0]
# Create 'result' with empty elements.
Expand Down
Loading
Loading