Skip to content

Commit

Permalink
Add pre_thresholding to Beam PrivatePCollection API (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored May 17, 2024
1 parent afbb949 commit 9249782
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 29 deletions.
34 changes: 19 additions & 15 deletions examples/movie_view_ratings/run_on_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
For running:
1. Install Python and run on the command line `pip install pipeline-dp apache-beam absl-py`
2. Run python python run_on_beam.py --input_file=<path to data.txt from 3> --output_file=<...>
2. Run python run_on_beam.py --input_file=<path to data.txt from 3> --output_file=<...>
"""

Expand Down Expand Up @@ -60,21 +60,25 @@ def main(unused_argv):

explain_computation_report = pipeline_dp.ExplainComputationReport()
# Calculate the private sum
params = SumParams(
# Limits to how much one user can contribute:
# .. at most two movies rated per user
max_partitions_contributed=2,
# .. at most one rating for each movie
max_contributions_per_partition=1,
# .. with minimal rating of "1"
min_value=1,
# .. and maximum rating of "5"
max_value=5,
# The aggregation key: we're grouping data by movies
partition_extractor=lambda mv: mv.movie_id,
# The value we're aggregating: we're summing up ratings
value_extractor=lambda mv: mv.rating,
# Limit the minimum partition size to release
pre_threshold=5)

dp_result = private_movie_views | "Private Sum" >> private_beam.Sum(
SumParams(
# Limits to how much one user can contribute:
# .. at most two movies rated per user
max_partitions_contributed=2,
# .. at most one rating for each movie
max_contributions_per_partition=1,
# .. with minimal rating of "1"
min_value=1,
# .. and maximum rating of "5"
max_value=5,
# The aggregation key: we're grouping data by movies
partition_extractor=lambda mv: mv.movie_id,
# The value we're aggregating: we're summing up ratings
value_extractor=lambda mv: mv.rating))
params)
budget_accountant.compute_budgets()

# Generate the Explain Computation Report. It must be called after
Expand Down
5 changes: 5 additions & 0 deletions examples/movie_view_ratings/run_without_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
flags.DEFINE_bool(
'pld_accounting', False, 'If false Naive budget accounting '
'is used, if true PLD accounting')
flags.DEFINE_integer('pre_threshold', None,
'Pre threshold which is used in the DP aggregation')


def main(unused_argv):
Expand Down Expand Up @@ -77,6 +79,9 @@ def main(unused_argv):
# .. and maximum rating of "5"
max_value=5)

if FLAGS.pre_threshold:
params.pre_threshold = FLAGS.pre_threshold

# Specify how to extract privacy_id, partition_key and value from an
# element of movie_views.
data_extractors = pipeline_dp.DataExtractors(
Expand Down
40 changes: 40 additions & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,13 @@ class SumParams:
max_contributions_per_partition. This option can be used if the
dataset does not contain any identifiers that can be used to enforce
contribution bounds automatically.
pre_threshold: the minimum amount of privacy units which are required
for keeping a partition in private partition selection. Note that this
is in addition to a differentially private partition selection, so a
partition with pre_threshold privacy units isn't necessarily kept. It
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
"""
max_partitions_contributed: int
max_contributions_per_partition: int
Expand All @@ -458,6 +465,7 @@ class SumParams:
budget_weight: float = 1
noise_kind: NoiseKind = NoiseKind.LAPLACE
contribution_bounds_already_enforced: bool = False
pre_threshold: Optional[int] = None

# TODO: add validation in __post_init__

Expand Down Expand Up @@ -488,6 +496,13 @@ class VarianceParams:
max_contributions_per_partition. This option can be used if the
dataset does not contain any identifiers that can be used to enforce
contribution bounds automatically.
pre_threshold: the minimum amount of privacy units which are required
for keeping a partition in private partition selection. Note that this
is in addition to a differentially private partition selection, so a
partition with pre_threshold privacy units isn't necessarily kept. It
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
"""
max_partitions_contributed: int
max_contributions_per_partition: int
Expand All @@ -498,6 +513,7 @@ class VarianceParams:
budget_weight: float = 1
noise_kind: NoiseKind = NoiseKind.LAPLACE
contribution_bounds_already_enforced: bool = False
pre_threshold: Optional[int] = None

# TODO: add validation in __post_init__

Expand Down Expand Up @@ -526,6 +542,13 @@ class MeanParams:
max_contributions_per_partition. This option can be used if the
dataset does not contain any identifiers that can be used to enforce
contribution bounds automatically.
pre_threshold: the minimum amount of privacy units which are required
for keeping a partition in private partition selection. Note that this
is in addition to a differentially private partition selection, so a
partition with pre_threshold privacy units isn't necessarily kept. It
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
"""
max_partitions_contributed: int
max_contributions_per_partition: int
Expand All @@ -536,6 +559,7 @@ class MeanParams:
budget_weight: float = 1
noise_kind: NoiseKind = NoiseKind.LAPLACE
contribution_bounds_already_enforced: bool = False
pre_threshold: Optional[int] = None

# TODO: add validation in __post_init__

Expand All @@ -559,6 +583,13 @@ class CountParams:
max_contributions_per_partition. This option can be used if the
dataset does not contain any identifiers that can be used to enforce
contribution bounds automatically.
pre_threshold: the minimum amount of privacy units which are required
for keeping a partition in private partition selection. Note that this
is in addition to a differentially private partition selection, so a
partition with pre_threshold privacy units isn't necessarily kept. It
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
"""

noise_kind: NoiseKind
Expand All @@ -567,6 +598,7 @@ class CountParams:
partition_extractor: Callable
budget_weight: float = 1
contribution_bounds_already_enforced: bool = False
pre_threshold: Optional[int] = None

# TODO: add validation in __post_init__

Expand All @@ -591,13 +623,21 @@ class PrivacyIdCountParams:
max_contributions_per_partition. This option can be used if the
dataset does not contain any identifiers that can be used to enforce
contribution bounds automatically.
pre_threshold: the minimum amount of privacy units which are required
for keeping a partition in private partition selection. Note that this
is in addition to a differentially private partition selection, so a
partition with pre_threshold privacy units isn't necessarily kept. It
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
"""

noise_kind: NoiseKind
max_partitions_contributed: int
partition_extractor: Callable
budget_weight: float = 1
contribution_bounds_already_enforced: bool = False
pre_threshold: Optional[int] = None

# TODO: add validation in __post_init__

Expand Down
15 changes: 10 additions & 5 deletions pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
max_contributions_per_partition=self._variance_params.
max_contributions_per_partition,
min_value=self._variance_params.min_value,
max_value=self._variance_params.max_value)
max_value=self._variance_params.max_value,
pre_threshold=self._variance_params.pre_threshold)

data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._variance_params.
Expand Down Expand Up @@ -214,7 +215,8 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
max_contributions_per_partition=self._mean_params.
max_contributions_per_partition,
min_value=self._mean_params.min_value,
max_value=self._mean_params.max_value)
max_value=self._mean_params.max_value,
pre_threshold=self._mean_params.pre_threshold)

data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._mean_params.partition_extractor(
Expand Down Expand Up @@ -277,7 +279,8 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
max_contributions_per_partition=self._sum_params.
max_contributions_per_partition,
min_value=self._sum_params.min_value,
max_value=self._sum_params.max_value)
max_value=self._sum_params.max_value,
pre_threshold=self._sum_params.pre_threshold)

data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._sum_params.partition_extractor(
Expand Down Expand Up @@ -338,7 +341,8 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
max_partitions_contributed=self._count_params.
max_partitions_contributed,
max_contributions_per_partition=self._count_params.
max_contributions_per_partition)
max_contributions_per_partition,
pre_threshold=self._count_params.pre_threshold)

data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._count_params.
Expand Down Expand Up @@ -400,7 +404,8 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
max_partitions_contributed=self._privacy_id_count_params.
max_partitions_contributed,
max_contributions_per_partition=1)
max_contributions_per_partition=1,
pre_threshold=self._privacy_id_count_params.pre_threshold)

data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._privacy_id_count_params.
Expand Down
27 changes: 18 additions & 9 deletions tests/private_beam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ def test_variance_calls_aggregate_with_params(self, mock_aggregate):
max_value=5,
budget_weight=1,
partition_extractor=lambda x: f"pk:{x // 10}",
value_extractor=lambda x: x)
value_extractor=lambda x: x,
pre_threshold=3)

# Act
transformer = private_beam.Variance(variance_params=variance_params,
Expand All @@ -182,7 +183,8 @@ def test_variance_calls_aggregate_with_params(self, mock_aggregate):
max_contributions_per_partition=variance_params.
max_contributions_per_partition,
min_value=variance_params.min_value,
max_value=variance_params.max_value)
max_value=variance_params.max_value,
pre_threshold=3)
self.assertEqual(params, args[1])

def test_variance_returns_sensible_result(self):
Expand All @@ -208,7 +210,8 @@ def test_variance_returns_sensible_result(self):
max_value=2.7889, # 100 should be clipped to this value
budget_weight=1,
partition_extractor=lambda x: x[1],
value_extractor=lambda x: x[2])
value_extractor=lambda x: x[2],
pre_threshold=5)

# Act
result = private_collection | private_beam.Variance(
Expand Down Expand Up @@ -288,7 +291,8 @@ def test_mean_calls_aggregate_with_params(self, mock_aggregate):
max_value=5,
budget_weight=1,
partition_extractor=lambda x: f"pk:{x // 10}",
value_extractor=lambda x: x)
value_extractor=lambda x: x,
pre_threshold=7)

# Act
transformer = private_beam.Mean(mean_params=mean_params,
Expand All @@ -309,7 +313,8 @@ def test_mean_calls_aggregate_with_params(self, mock_aggregate):
max_contributions_per_partition=mean_params.
max_contributions_per_partition,
min_value=mean_params.min_value,
max_value=mean_params.max_value)
max_value=mean_params.max_value,
pre_threshold=7)
self.assertEqual(params, args[1])

def test_mean_returns_sensible_result(self):
Expand Down Expand Up @@ -536,7 +541,8 @@ def test_count_calls_aggregate_with_params(self, mock_aggregate):
max_partitions_contributed=2,
max_contributions_per_partition=3,
budget_weight=1,
partition_extractor=lambda x: f"pk:{x // 10}")
partition_extractor=lambda x: f"pk:{x // 10}",
pre_threshold=3)

# Act
transformer = private_beam.Count(count_params=count_params)
Expand All @@ -554,7 +560,8 @@ def test_count_calls_aggregate_with_params(self, mock_aggregate):
max_partitions_contributed=count_params.
max_partitions_contributed,
max_contributions_per_partition=count_params.
max_contributions_per_partition)
max_contributions_per_partition,
pre_threshold=3)
self.assertEqual(args[1], params)

def test_count_returns_sensible_result(self):
Expand Down Expand Up @@ -646,7 +653,8 @@ def test_privacy_id_count_calls_aggregate_with_params(self, mock_aggregate):
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
budget_weight=1,
partition_extractor=lambda x: f"pk:{x // 10}")
partition_extractor=lambda x: f"pk:{x // 10}",
pre_threshold=8)

# Act
transformer = private_beam.PrivacyIdCount(
Expand All @@ -664,7 +672,8 @@ def test_privacy_id_count_calls_aggregate_with_params(self, mock_aggregate):
metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
max_partitions_contributed=privacy_id_count_params.
max_partitions_contributed,
max_contributions_per_partition=1)
max_contributions_per_partition=1,
pre_threshold=8)
self.assertEqual(args[1], params)

def test_privacy_id_count_returns_sensible_result(self):
Expand Down

0 comments on commit 9249782

Please sign in to comment.