Skip to content

Commit

Permalink
Implement 1 BudgetAccount per one configuration in utility analysis (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Nov 21, 2023
1 parent cac530f commit 36fc26c
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 27 deletions.
69 changes: 67 additions & 2 deletions analysis/tests/utility_analysis_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,73 @@ def test_multi_parameters(self):

self.assertSequenceEqual(expected_pk0, output[0][1])
self.assertSequenceEqual(expected_pk1, output[1][1])
# Check that the number of budget requests equal to number of metrics.
self.assertLen(budget_accountant._mechanisms, 1)

def test_multi_parameters_different_noise_kind(self):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1,
max_contributions_per_partition=1)

multi_param = analysis.MultiParameterConfiguration(
max_partitions_contributed=[1, 2],
max_contributions_per_partition=[1, 2],
noise_kind=[
pipeline_dp.NoiseKind.LAPLACE, pipeline_dp.NoiseKind.GAUSSIAN
])

budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-10)

engine = utility_analysis_engine.UtilityAnalysisEngine(
budget_accountant=budget_accountant,
backend=pipeline_dp.LocalBackend())

# Input collection has 1 privacy id, which contributes to 2 partitions
# 1 and 2 times correspondingly.
input = [(0, "pk0"), (0, "pk1"), (0, "pk1")]
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda x: x[0],
partition_extractor=lambda x: x[1],
value_extractor=lambda x: 0)

options = analysis.UtilityAnalysisOptions(
epsilon=1,
delta=0,
aggregate_params=aggregate_params,
multi_param_configuration=multi_param)
output = engine.analyze(input,
options=options,
data_extractors=data_extractors,
public_partitions=None)

output = list(output)
self.assertLen(output, 2) # 2 partitions

expected_pk0 = [
metrics.RawStatistics(privacy_id_count=1, count=1),
5e-11, # Probability that the partition is kept
metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT,
sum=1.0,
clipping_to_min_error=0.0,
clipping_to_max_error=0.0,
expected_l0_bounding_error=-0.5,
std_l0_bounding_error=0.5,
std_noise=2.8284271247461903,
noise_kind=pipeline_dp.NoiseKind.LAPLACE),
2.50000000003125e-11, # Probability that the partition is kept
metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT,
sum=1.0,
clipping_to_min_error=0.0,
clipping_to_max_error=0.0,
expected_l0_bounding_error=0,
std_l0_bounding_error=0.0,
std_noise=32.99095075973487,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)
]

self.assertSequenceEqual(expected_pk0, output[0][1])

@patch('pipeline_dp.sampling_utils.ValueSampler.__init__')
def test_partition_sampling(self, mock_sampler_init):
Expand Down
45 changes: 25 additions & 20 deletions analysis/utility_analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""DPEngine for utility analysis."""

import copy
from typing import Optional, Union

import pipeline_dp
Expand Down Expand Up @@ -78,8 +78,9 @@ def analyze(self,

# Build the computation graph from the parent class by calling
# aggregate().
result = super().aggregate(col, options.aggregate_params,
data_extractors, public_partitions)
self._add_report_generator(options.aggregate_params, "analyze")
result = super()._aggregate(col, options.aggregate_params,
data_extractors, public_partitions)

self._is_public_partitions = None
self._options = None
Expand All @@ -98,41 +99,45 @@ def _create_contribution_bounder(
def _create_compound_combiner(
self, aggregate_params: pipeline_dp.AggregateParams
) -> combiners.CompoundCombiner:
mechanism_type = aggregate_params.noise_kind.convert_to_mechanism_type()
# Compute budgets
# 1. For private partition selection.
if not self._is_public_partitions:
private_partition_selection_budget = self._budget_accountant.request_budget(
pipeline_dp.MechanismType.GENERIC,
weight=aggregate_params.budget_weight)
# 2. For metrics.
budgets = {}
for metric in aggregate_params.metrics:
budgets[metric] = self._budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)

# Create Utility analysis combiners.
internal_combiners = [per_partition_combiners.RawStatisticsCombiner()]
for params in data_structures.get_aggregate_params(self._options):
# Each parameter configuration has own BudgetAccountant which allows
# different mechanisms to be used in different configurations.
budget_accountant = copy.deepcopy(self._budget_accountant)

mechanism_type = None
if params.noise_kind is None:
# This is select partition case.
assert not aggregate_params.metrics, \
f"Noise kind should be given when " \
f"{aggregate_params.metrics[0]} is analyzed"
else:
mechanism_type = params.noise_kind.convert_to_mechanism_type()
# WARNING: Do not change the order here,
# _create_aggregate_error_compound_combiner() in utility_analysis.py
# depends on it.
if not self._is_public_partitions:
internal_combiners.append(
per_partition_combiners.PartitionSelectionCombiner(
private_partition_selection_budget, params))
budget_accountant.request_budget(
pipeline_dp.MechanismType.GENERIC), params))
if pipeline_dp.Metrics.SUM in aggregate_params.metrics:
internal_combiners.append(
per_partition_combiners.SumCombiner(
budgets[pipeline_dp.Metrics.SUM], params))
budget_accountant.request_budget(mechanism_type),
params))
if pipeline_dp.Metrics.COUNT in aggregate_params.metrics:
internal_combiners.append(
per_partition_combiners.CountCombiner(
budgets[pipeline_dp.Metrics.COUNT], params))
budget_accountant.request_budget(mechanism_type),
params))
if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics:
internal_combiners.append(
per_partition_combiners.PrivacyIdCountCombiner(
budgets[pipeline_dp.Metrics.PRIVACY_ID_COUNT], params))
budget_accountant.request_budget(mechanism_type),
params))
budget_accountant.compute_budgets()

return per_partition_combiners.CompoundCombiner(
internal_combiners, return_named_tuple=False)
Expand Down
16 changes: 11 additions & 5 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ def __init__(self, budget_accountant: 'BudgetAccountant',
def _current_report_generator(self):
return self._report_generators[-1]

def _add_report_generator(self,
params,
method_name: str,
is_public_partition: Optional[bool] = None):
self._report_generators.append(
report_generator.ReportGenerator(params, method_name,
is_public_partition))

def _add_report_stage(self, stage_description):
self._current_report_generator.add_stage(stage_description)

Expand Down Expand Up @@ -86,9 +94,8 @@ def aggregate(self,
params.custom_combiners is not None)

with self._budget_accountant.scope(weight=params.budget_weight):
self._report_generators.append(
report_generator.ReportGenerator(params, "aggregate",
public_partitions is not None))
self._add_report_generator(params, "aggregate", public_partitions
is not None)
if out_explain_computation_report is not None:
out_explain_computation_report._set_report_generator(
self._current_report_generator)
Expand Down Expand Up @@ -217,8 +224,7 @@ def select_partitions(self, col, params: pipeline_dp.SelectPartitionsParams,
self._check_budget_accountant_compatibility(False, [], False)

with self._budget_accountant.scope(weight=params.budget_weight):
self._report_generators.append(
report_generator.ReportGenerator(params, "select_partitions"))
self._add_report_generator(params, "select_partitions")
col = self._select_partitions(col, params, data_extractors)
budget = self._budget_accountant._compute_budget_for_aggregation(
params.budget_weight)
Expand Down

0 comments on commit 36fc26c

Please sign in to comment.