From 36fc26c25442619e03f652f4b01d4a17a6266bb7 Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko <53558779+dvadym@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:19:28 +0100 Subject: [PATCH] Implement 1 BudgetAccount per one configuration in utility analysis (#507) --- .../tests/utility_analysis_engine_test.py | 69 ++++++++++++++++++- analysis/utility_analysis_engine.py | 45 ++++++------ pipeline_dp/dp_engine.py | 16 +++-- 3 files changed, 103 insertions(+), 27 deletions(-) diff --git a/analysis/tests/utility_analysis_engine_test.py b/analysis/tests/utility_analysis_engine_test.py index fef519b7..d0410b3f 100644 --- a/analysis/tests/utility_analysis_engine_test.py +++ b/analysis/tests/utility_analysis_engine_test.py @@ -299,8 +299,73 @@ def test_multi_parameters(self): self.assertSequenceEqual(expected_pk0, output[0][1]) self.assertSequenceEqual(expected_pk1, output[1][1]) - # Check that the number of budget requests equal to number of metrics. - self.assertLen(budget_accountant._mechanisms, 1) + + def test_multi_parameters_different_noise_kind(self): + # Arrange + aggregate_params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + metrics=[pipeline_dp.Metrics.COUNT], + max_partitions_contributed=1, + max_contributions_per_partition=1) + + multi_param = analysis.MultiParameterConfiguration( + max_partitions_contributed=[1, 2], + max_contributions_per_partition=[1, 2], + noise_kind=[ + pipeline_dp.NoiseKind.LAPLACE, pipeline_dp.NoiseKind.GAUSSIAN + ]) + + budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, + total_delta=1e-10) + + engine = utility_analysis_engine.UtilityAnalysisEngine( + budget_accountant=budget_accountant, + backend=pipeline_dp.LocalBackend()) + + # Input collection has 1 privacy id, which contributes to 2 partitions + # 1 and 2 times correspondingly. + input = [(0, "pk0"), (0, "pk1"), (0, "pk1")] + data_extractors = pipeline_dp.DataExtractors( + privacy_id_extractor=lambda x: x[0], + partition_extractor=lambda x: x[1], + value_extractor=lambda x: 0) + + options = analysis.UtilityAnalysisOptions( + epsilon=1, + delta=0, + aggregate_params=aggregate_params, + multi_param_configuration=multi_param) + output = engine.analyze(input, + options=options, + data_extractors=data_extractors, + public_partitions=None) + + output = list(output) + self.assertLen(output, 2) # 2 partitions + + expected_pk0 = [ + metrics.RawStatistics(privacy_id_count=1, count=1), + 5e-11, # Probability that the partition is kept + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, + sum=1.0, + clipping_to_min_error=0.0, + clipping_to_max_error=0.0, + expected_l0_bounding_error=-0.5, + std_l0_bounding_error=0.5, + std_noise=2.8284271247461903, + noise_kind=pipeline_dp.NoiseKind.LAPLACE), + 2.50000000003125e-11, # Probability that the partition is kept + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, + sum=1.0, + clipping_to_min_error=0.0, + clipping_to_max_error=0.0, + expected_l0_bounding_error=0, + std_l0_bounding_error=0.0, + std_noise=32.99095075973487, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN) + ] + + self.assertSequenceEqual(expected_pk0, output[0][1]) @patch('pipeline_dp.sampling_utils.ValueSampler.__init__') def test_partition_sampling(self, mock_sampler_init): diff --git a/analysis/utility_analysis_engine.py b/analysis/utility_analysis_engine.py index 34ade64b..3c0309c8 100644 --- a/analysis/utility_analysis_engine.py +++ b/analysis/utility_analysis_engine.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """DPEngine for utility analysis.""" - +import copy from typing import Optional, Union import pipeline_dp @@ -78,8 +78,9 @@ def analyze(self, # Build the computation graph from the parent class by calling # aggregate(). - result = super().aggregate(col, options.aggregate_params, - data_extractors, public_partitions) + self._add_report_generator(options.aggregate_params, "analyze") + result = super()._aggregate(col, options.aggregate_params, + data_extractors, public_partitions) self._is_public_partitions = None self._options = None @@ -98,41 +99,45 @@ def _create_contribution_bounder( def _create_compound_combiner( self, aggregate_params: pipeline_dp.AggregateParams ) -> combiners.CompoundCombiner: - mechanism_type = aggregate_params.noise_kind.convert_to_mechanism_type() - # Compute budgets - # 1. For private partition selection. - if not self._is_public_partitions: - private_partition_selection_budget = self._budget_accountant.request_budget( - pipeline_dp.MechanismType.GENERIC, - weight=aggregate_params.budget_weight) - # 2. For metrics. - budgets = {} - for metric in aggregate_params.metrics: - budgets[metric] = self._budget_accountant.request_budget( - mechanism_type, weight=aggregate_params.budget_weight) - # Create Utility analysis combiners. internal_combiners = [per_partition_combiners.RawStatisticsCombiner()] for params in data_structures.get_aggregate_params(self._options): + # Each parameter configuration has own BudgetAccountant which allows + # different mechanisms to be used in different configurations. + budget_accountant = copy.deepcopy(self._budget_accountant) + + mechanism_type = None + if params.noise_kind is None: + # This is select partition case. + assert not aggregate_params.metrics, \ + f"Noise kind should be given when " \ + f"{aggregate_params.metrics[0]} is analyzed" + else: + mechanism_type = params.noise_kind.convert_to_mechanism_type() # WARNING: Do not change the order here, # _create_aggregate_error_compound_combiner() in utility_analysis.py # depends on it. if not self._is_public_partitions: internal_combiners.append( per_partition_combiners.PartitionSelectionCombiner( - private_partition_selection_budget, params)) + budget_accountant.request_budget( + pipeline_dp.MechanismType.GENERIC), params)) if pipeline_dp.Metrics.SUM in aggregate_params.metrics: internal_combiners.append( per_partition_combiners.SumCombiner( - budgets[pipeline_dp.Metrics.SUM], params)) + budget_accountant.request_budget(mechanism_type), + params)) if pipeline_dp.Metrics.COUNT in aggregate_params.metrics: internal_combiners.append( per_partition_combiners.CountCombiner( - budgets[pipeline_dp.Metrics.COUNT], params)) + budget_accountant.request_budget(mechanism_type), + params)) if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics: internal_combiners.append( per_partition_combiners.PrivacyIdCountCombiner( - budgets[pipeline_dp.Metrics.PRIVACY_ID_COUNT], params)) + budget_accountant.request_budget(mechanism_type), + params)) + budget_accountant.compute_budgets() return per_partition_combiners.CompoundCombiner( internal_combiners, return_named_tuple=False) diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 6fb2b613..37955eb2 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -40,6 +40,14 @@ def __init__(self, budget_accountant: 'BudgetAccountant', def _current_report_generator(self): return self._report_generators[-1] + def _add_report_generator(self, + params, + method_name: str, + is_public_partition: Optional[bool] = None): + self._report_generators.append( + report_generator.ReportGenerator(params, method_name, + is_public_partition)) + def _add_report_stage(self, stage_description): self._current_report_generator.add_stage(stage_description) @@ -86,9 +94,8 @@ def aggregate(self, params.custom_combiners is not None) with self._budget_accountant.scope(weight=params.budget_weight): - self._report_generators.append( - report_generator.ReportGenerator(params, "aggregate", - public_partitions is not None)) + self._add_report_generator(params, "aggregate", public_partitions + is not None) if out_explain_computation_report is not None: out_explain_computation_report._set_report_generator( self._current_report_generator) @@ -217,8 +224,7 @@ def select_partitions(self, col, params: pipeline_dp.SelectPartitionsParams, self._check_budget_accountant_compatibility(False, [], False) with self._budget_accountant.scope(weight=params.budget_weight): - self._report_generators.append( - report_generator.ReportGenerator(params, "select_partitions")) + self._add_report_generator(params, "select_partitions") col = self._select_partitions(col, params, data_extractors) budget = self._budget_accountant._compute_budget_for_aggregation( params.budget_weight)