Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce boolean variable for enabling/disabling of cross partition contribution bounding #515

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class AggregateParams:
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
perform_cross_partition_contribution_bounding: whether to perform cross
partition contribution bounding.
Warning: turn off cross partition contribution bounding only when the
number of contributed partitions per privacy unit is already bounded
by max_partitions_contributed.
"""
metrics: List[Metric]
noise_kind: NoiseKind = NoiseKind.LAPLACE
Expand All @@ -254,6 +259,7 @@ class AggregateParams:
partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC
pre_threshold: Optional[int] = None
post_aggregation_thresholding: bool = False
perform_cross_partition_contribution_bounding: bool = True

@property
def metrics_str(self) -> str:
Expand Down
55 changes: 53 additions & 2 deletions pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from pipeline_dp import pipeline_backend
from pipeline_dp import sampling_utils

# TODO(dvadym):
# 1. rename ContributionBounder -> ContributionSampler, because all those
# classes do contribution bounding only by sampling.
# 2. Introduce L0/Linf/L1 sampling in names (the current names are too long
# and not readable).


class ContributionBounder(abc.ABC):
"""Interface for objects which perform contribution bounding."""
Expand Down Expand Up @@ -76,8 +82,8 @@ def bound_contributions(self, col, params, backend, report_generator,
"Sample per (privacy_id, partition_key)")
report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f"partition, randomly select max(actual_contributions_per_partition"
f", {max_contributions_per_partition}) contributions.")
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {max_contributions_per_partition}) contributions.")
# ((privacy_id, partition_key), [value])
col = backend.map_values(
col, aggregate_fn,
Expand Down Expand Up @@ -195,6 +201,51 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values):
"Apply aggregate_fn after cross-partition contribution bounding")


class LinfSampler(ContributionBounder):
"""Bounds the contribution of privacy_id per partition.

It ensures that each privacy_id contributes to each partition not more than
max_contributions_per_partition records (per-partition contribution
bounding), by performing sampling if needed.
"""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")

col = backend.sample_fixed_per_key(
col, params.max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), value)

report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {params.max_contributions_per_partition}) contributions.")

return backend.map_values(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")


class NoOpSampler(ContributionBounder):
"""Does no sampling."""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")
# ((privacy_id, partition_key), value)

col = backend.group_by_key(col, "Group by (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])

return backend.map_values(col, aggregate_fn, "Apply aggregate_fn")


def collect_values_per_partition_key_per_privacy_id(
col, backend: pipeline_backend.PipelineBackend):
"""Collects values into a list for each privacy_id and partition_key.
Expand Down
14 changes: 11 additions & 3 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,18 @@ def _create_contribution_bounder(
return \
contribution_bounders.SamplingPerPrivacyIdContributionBounder(
)
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
if params.perform_cross_partition_contribution_bounding:
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder()
# no cross partition contribution
if expects_per_partition_sampling:
return contribution_bounders.LinfSampler()
# No sampling, but combiners themselves do per partition contribution
# bounding.
return contribution_bounders.NoOpSampler()

def _extract_columns(self, col,
data_extractors: pipeline_dp.DataExtractors):
Expand Down
57 changes: 57 additions & 0 deletions tests/contribution_bounders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CrossAndPerPartitionContributionParams = collections.namedtuple(
"CrossAndPerPartitionContributionParams",
["max_partitions_contributed", "max_contributions_per_partition"])
PerPartitionContributionParams = collections.namedtuple(
"PerPartitionContributionParams", ["max_contributions_per_partition"])

aggregate_fn = lambda input_value: (len(input_value), np.sum(input_value),
np.sum(np.square(input_value)))
Expand Down Expand Up @@ -150,3 +152,58 @@ def test_contribution_bounding_empty_col(self):
bound_result = self._run_contribution_bounding(input, max_contributions)

self.assertEmpty(bound_result)


class LinfSampler(parameterized.TestCase):
dvadym marked this conversation as resolved.
Show resolved Hide resolved

def _run_sampling(self, input, max_contributions_per_partition):
params = PerPartitionContributionParams(max_contributions_per_partition)

bounder = contribution_bounders.LinfSampler()
return list(
bounder.bound_contributions(input, params,
pipeline_dp.LocalBackend(),
_create_report_generator(),
lambda x: x))

def test_samping_applied(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid2', 'pk1', 4)]
max_contributions_per_partition = 1
bound_result = self._run_sampling(input,
max_contributions_per_partition)
bound_result = dict(bound_result)
# {(privacy_id, partition_key), [values])
self.assertLen(bound_result, 2)
self.assertLen(bound_result[('pid1', 'pk1')], 1)
self.assertLen(bound_result[('pid2', 'pk1')], 1)

def test_sampling_applied_nothing_dropped(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would rename this methods to "test_more_contributions_than_bound_nothing_dropped" to make it clear that we expect all contributions to be kept under specific circumstances.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid1', 'pk1', 3)]
max_contributions_per_partition = 3

bound_result = self._run_sampling(input,
max_contributions_per_partition)
print(bound_result)
# self.assertEqual(set(expected_result), set(bound_result))

def test_empty_col(self):
self.assertEmpty(
self._run_sampling([], max_contributions_per_partition=1))


class NoOpContributionBounderTest(parameterized.TestCase):

def test_contribution_bounding_applied(self):
dvadym marked this conversation as resolved.
Show resolved Hide resolved
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid3', 'pk2', 4)]
bounder = contribution_bounders.NoOpSampler()
bound_result = bounder.bound_contributions(
input,
params=(),
backend=pipeline_dp.LocalBackend(),
report_generator=_create_report_generator(),
aggregate_fn=lambda x: x)
self.assertEqual(list(bound_result), [(('pid1', 'pk1'), [1, 2]),
(('pid2', 'pk1'), [3]),
(('pid3', 'pk2'), [4])])
54 changes: 54 additions & 0 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,60 @@ def test_aggregate_computation_graph_per_partition_bounding(
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.LinfSampler.bound_contributions')
def test_aggregate_computation_graph_only_linf_sampling(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_value=0,
max_value=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.NoOpSampler.bound_contributions')
def test_aggregate_computation_graph_only_no_sampling_for_sum_when_no_cross_partition(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in "test_aggregate_computation_graph_only_no_sampling_for_sum_when_no_cross_partition"

what does "only" refer to? is it "only test graph and nothing else"? In that case, I think we can drop "only". "test_computational_graph" already indicates what's being tested.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, we can drop only. Done

self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_sum_per_partition=0,
max_sum_per_partition=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',)
def test_aggregate_no_partition_filtering_public_partitions(
self, mock_drop_partitions):
Expand Down
Loading