Skip to content

Commit

Permalink
Release version v2.7.0
Browse files Browse the repository at this point in the history
  - fix: Update op-allowed user group for GCP G3P

GitOrigin-RevId: b5ce39cfc54d7486390864a294cd44a751d04c6c
  • Loading branch information
Privacy Sandbox Team authored and copybara-github committed Aug 9, 2024
1 parent ec108b9 commit 86356cd
Show file tree
Hide file tree
Showing 80 changed files with 1,244 additions and 1,935 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [2.7.0](https://github.com/privacysandbox/aggregation-service/compare/v2.6.0...v2.7.0) (2024-08-01)

- Added support for aggregating reports belonging to multiple reporting origins under the same
reporting site in a single aggregation job.
- [GCP Only] Updated coordinator endpoints to new Google/Third-Party coordinator pair.

## [2.6.0](https://github.com/privacysandbox/aggregation-service/compare/v2.5.0...v2.6.0) (2024-07-19)

- Enabled support for
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.0
2.7.0
2 changes: 1 addition & 1 deletion build-scripts/DEBIAN_CONTAINER_DIGEST
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sha256:39868a6f452462b70cf720a8daff250c63e7342970e749059c105bf7c1e8eeaf
sha256:16112ae93b810eb1ec6d1db6e01835d2444c8ca99aa678e03dd104ea3ec68408
2 changes: 1 addition & 1 deletion build-scripts/gcp/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
steps:
- name: '$_BUILD_IMAGE_REPO_PATH/bazel-build-container:$_VERSION'
script: |
bazel run worker/gcp:worker_mp_gcp_prod -- -dst "$_IMAGE_REPO_PATH/$_IMAGE_NAME:$_IMAGE_TAG"
bazel run worker/gcp:worker_mp_gcp_g3p_prod -- -dst "$_IMAGE_REPO_PATH/$_IMAGE_NAME:$_IMAGE_TAG"
bazel run //terraform/gcp:frontend_service_http_cloud_function_release \
--//terraform/gcp:bucket_flag=$_JARS_PUBLISH_BUCKET --//terraform/gcp:bucket_path_flag=$_JARS_PUBLISH_BUCKET_PATH \
-- --version=$_VERSION
Expand Down
4 changes: 2 additions & 2 deletions build_defs/container_dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
# - java_base: Distroless image for running Java.
################################################################################

# Updated as of: 2024-07-19
# Updated as of: 2024-07-26

CONTAINER_DEPS = {
"amazonlinux_2": {
"digest": "sha256:7081389e0a1d55d5c05a6bab72fb8a82b37c72a724365c6104c7fbc5bcdb2e09",
"digest": "sha256:b2ed30084a71c34c0f41a5add7dd623a2e623f2c3b50117c720bbc02d7653fa1",
"registry": "index.docker.io",
"repository": "amazonlinux",
},
Expand Down
14 changes: 14 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ POST
// This should be same as the reporting_origin present in the reports' shared_info.
"attribution_report_to": <string>,

// [Optional] Reporting Site.
// This should be the reporting site that is onboared to aggregation service.
// Note: All reports in the request should have reporting origins which
// belong to the reporting site mentioned in this parameter. This parameter
// and the "attribution_report_to" parameter are mutually exclusive, exactly
// one of the two parameters should be provided in the request.
"reporting_site": "<string>"

// [Optional] Differential privacy epsilon value to be used
// for this job. 0.0 < debug_privacy_epsilon <= 64.0. The
// value can be varied so that tests with different epsilon
Expand Down Expand Up @@ -156,6 +164,10 @@ These are the validations that are done before the aggregation begins.
ATTRIBUTION_REPORT_TO_MISMATCH error counter. Aggregatable report validations and error counters
can be found in the
[Input Aggregatable Report Validations](#input-aggregatable-report-validations) below
4. Job request's `job_parameters` should contain exactly one of `attribution_report_to` and
`reporting_site`.
5. If `job_parameters.reporting_site` is provided, `shared_info.reporting_origin` of all
aggregatable reports should belong to this reporting site.

Return code:
[INVALID_JOB](java/com/google/aggregate/adtech/worker/AggregationWorkerReturnCode.java#L38)
Expand Down Expand Up @@ -227,6 +239,8 @@ Not found: 404 Not Found
"output_domain_bucket_name": <string>,
// Reporting URL
"attribution_report_to" : <string>,
// [Optional] Reporting site value from the CreateJob request, if provided.
"reporting_site": <string>
// [Optional] differential privacy epsilon value to be used
// for this job. 0.0 < debug_privacy_epsilon <= 64.0. The
// value can be varied so that tests with different epsilon
Expand Down
9 changes: 9 additions & 0 deletions docs/gcp-aggregation-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,12 @@ _Note: If you use self-built artifacts described in
[build-scripts/gcp](/build-scripts/gcp/README.md), run `bash fetch_terraform.sh` instead of
`bash download_prebuilt_dependencies.sh` and make sure you updated your dependencies in the `jars`
folder._
_Note: When migrating to new coordinator pair from version 2.[4|5|6].z to 2.7.z or later, ensure the
file `/terraform/gcp/environments/shared/release_params.auto.tfvars` was updated with the following
values:_
```sh
coordinator_a_impersonate_service_account = "a-opallowedusr@ps-msmt-coord-prd-g3p-svcacc.iam.gserviceaccount.com"
coordinator_b_impersonate_service_account = "[email protected]"
```
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.aggregate.adtech.worker;

import static com.google.aggregate.adtech.worker.util.DebugSupportHelper.getDebugFilePrefix;
import static com.google.scp.operator.cpio.blobstorageclient.BlobStorageClient.BlobStorageClientException;
import static com.google.scp.operator.cpio.blobstorageclient.BlobStorageClient.getDataLocation;
import static com.google.scp.operator.shared.model.BackendModelUtil.toJobKeyString;
import static java.lang.annotation.ElementType.FIELD;
Expand All @@ -31,10 +30,8 @@
import com.google.aggregate.adtech.worker.Annotations.ResultWriter;
import com.google.aggregate.adtech.worker.exceptions.ResultLogException;
import com.google.aggregate.adtech.worker.model.AggregatedFact;
import com.google.aggregate.adtech.worker.model.EncryptedReport;
import com.google.aggregate.adtech.worker.util.OutputShardFileHelper;
import com.google.aggregate.adtech.worker.writer.LocalResultFileWriter;
import com.google.aggregate.adtech.worker.writer.LocalResultFileWriter.FileWriteException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -45,7 +42,6 @@
import com.google.scp.operator.cpio.blobstorageclient.BlobStorageClient;
import com.google.scp.operator.cpio.blobstorageclient.model.DataLocation;
import com.google.scp.operator.cpio.jobclient.model.Job;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.nio.file.Files;
Expand All @@ -69,7 +65,6 @@ public final class LocalFileToCloudStorageLogger implements ResultLogger {
private final BlobStorageClient blobStorageClient;
private final Path workingDirectory;
private final ListeningExecutorService blockingThreadPool;
private static final String reencryptedReportFileNamePrefix = "reencrypted-";

@Inject
LocalFileToCloudStorageLogger(
Expand Down Expand Up @@ -165,26 +160,6 @@ public void logResults(ImmutableList<AggregatedFact> results, Job ctx, boolean i
}
}

@Override
public void logReports(ImmutableList<EncryptedReport> reports, Job ctx, String shardNumber)
throws ResultLogException {
String localFileName =
toJobKeyString(ctx.jobKey())
+ "-"
+ reencryptedReportFileNamePrefix
+ shardNumber
+ ".avro";
Path localReportsFilePath =
workingDirectory
.getFileSystem()
.getPath(Paths.get(workingDirectory.toString(), localFileName).toString());
try {
writeReportsToCloud(reports.stream(), ctx, localReportsFilePath, localResultFileWriter);
} catch (FileWriteException | BlobStorageClientException | IOException e) {
throw new ResultLogException(e);
}
}

@SuppressWarnings("UnstableApiUsage")
private ListenableFuture<Void> writeFile(
Stream<AggregatedFact> aggregatedFacts,
Expand Down Expand Up @@ -220,21 +195,6 @@ private ListenableFuture<Void> writeFile(
blockingThreadPool);
}

private void writeReportsToCloud(
Stream<EncryptedReport> reports, Job ctx, Path localFilepath, LocalResultFileWriter writer)
throws IOException, FileWriteException, BlobStorageClientException {
Files.createDirectories(workingDirectory);
writer.writeLocalReportFile(reports, localFilepath);

String outputDataBlobBucket = ctx.requestInfo().getOutputDataBucketName();
String outputDataBlobPrefix = localFilepath.getFileName().toString();

DataLocation resultLocation = getDataLocation(outputDataBlobBucket, outputDataBlobPrefix);

blobStorageClient.putBlob(resultLocation, localFilepath);
Files.deleteIfExists(localFilepath);
}

/**
* The local file name has a random UUID in it to prevent cases where an item is processed twice
* by the same worker and clobbers other files being written.
Expand Down
18 changes: 0 additions & 18 deletions java/com/google/aggregate/adtech/worker/LocalResultLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.aggregate.adtech.worker.LibraryAnnotations.LocalOutputDirectory;
import com.google.aggregate.adtech.worker.exceptions.ResultLogException;
import com.google.aggregate.adtech.worker.model.AggregatedFact;
import com.google.aggregate.adtech.worker.model.EncryptedReport;
import com.google.aggregate.adtech.worker.writer.LocalResultFileWriter;
import com.google.aggregate.adtech.worker.writer.LocalResultFileWriter.FileWriteException;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,23 +67,6 @@ public void logResults(ImmutableList<AggregatedFact> results, Job ctx, boolean i
isDebugRun ? localDebugResultFileWriter : localResultFileWriter);
}

// TODO(b/315199032): Add local runner test
@Override
public void logReports(ImmutableList<EncryptedReport> reports, Job ctx, String shardNumber)
throws ResultLogException {
String localFileName = "reencrypted_report.avro";
Path localReportsFilePath =
workingDirectory
.getFileSystem()
.getPath(Paths.get(workingDirectory.toString(), localFileName).toString());
try {
Files.createDirectories(workingDirectory);
localResultFileWriter.writeLocalReportFile(reports.stream(), localReportsFilePath);
} catch (IOException | FileWriteException e) {
throw new ResultLogException(e);
}
}

private DataLocation writeFile(
Stream<AggregatedFact> results, Job ctx, Path filePath, LocalResultFileWriter writer)
throws ResultLogException {
Expand Down
5 changes: 0 additions & 5 deletions java/com/google/aggregate/adtech/worker/ResultLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.aggregate.adtech.worker.exceptions.ResultLogException;
import com.google.aggregate.adtech.worker.model.AggregatedFact;
import com.google.aggregate.adtech.worker.model.EncryptedReport;
import com.google.common.collect.ImmutableList;
import com.google.scp.operator.cpio.jobclient.model.Job;

Expand All @@ -28,8 +27,4 @@ public interface ResultLogger {
/** Takes the aggregation results and logs them to results. */
void logResults(ImmutableList<AggregatedFact> results, Job ctx, boolean isDebugRun)
throws ResultLogException;

/** Logs encrypted aggregatable reports. */
void logReports(ImmutableList<EncryptedReport> results, Job ctx, String shardNumber)
throws ResultLogException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static com.google.aggregate.adtech.worker.util.JobUtils.JOB_PARAM_OUTPUT_DOMAIN_BUCKET_NAME;
import static com.google.aggregate.adtech.worker.util.JobUtils.JOB_PARAM_REPORT_ERROR_THRESHOLD_PERCENTAGE;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.scp.operator.shared.model.BackendModelUtil.toJobKeyString;

import com.google.aggregate.adtech.worker.AggregationWorkerReturnCode;
Expand Down Expand Up @@ -76,8 +75,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.errorprone.annotations.Var;
import com.google.privacysandbox.otel.OTelConfiguration;
Expand All @@ -94,7 +91,6 @@
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.security.AccessControlException;
import java.util.List;
import java.util.Map;
Expand All @@ -115,6 +111,8 @@ public final class ConcurrentAggregationProcessor implements JobProcessor {
public static final String JOB_PARAM_ATTRIBUTION_REPORT_TO = "attribution_report_to";
// Key to indicate whether this is a debug job
public static final String JOB_PARAM_DEBUG_RUN = "debug_run";
// Key for user provided reporting site value in the job params of the job request.
public static final String JOB_PARAM_REPORTING_SITE = "reporting_site";

private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
// In aggregation service, reading is much faster than decryption, and most of the time, it waits
Expand Down Expand Up @@ -153,6 +151,8 @@ public final class ConcurrentAggregationProcessor implements JobProcessor {
private final boolean enablePrivacyBudgetKeyFiltering;
private final OTelConfiguration oTelConfiguration;
private final double defaultReportErrorThresholdPercentage;

// TODO(b/338219415): Reuse this flag to enable full streaming approach.
private final Boolean streamingOutputDomainProcessing;

@Inject
Expand Down Expand Up @@ -223,7 +223,7 @@ public JobResult process(Job job)
if (jobParams.containsKey(JOB_PARAM_OUTPUT_DOMAIN_BUCKET_NAME)
&& jobParams.containsKey(JOB_PARAM_OUTPUT_DOMAIN_BLOB_PREFIX)
&& (!jobParams.get(JOB_PARAM_OUTPUT_DOMAIN_BUCKET_NAME).isEmpty()
|| !jobParams.get(JOB_PARAM_OUTPUT_DOMAIN_BLOB_PREFIX).isEmpty())) {
|| !jobParams.get(JOB_PARAM_OUTPUT_DOMAIN_BLOB_PREFIX).isEmpty())) {
outputDomainLocation =
Optional.of(
BlobStorageClient.getDataLocation(
Expand Down Expand Up @@ -297,34 +297,16 @@ public JobResult process(Job job)

NoisedAggregatedResultSet noisedResultSet;
try {
if (streamingOutputDomainProcessing) {
noisedResultSet =
conflateWithDomainAndAddNoiseStreaming(
outputDomainLocation,
outputDomainShards,
aggregationEngine,
debugPrivacyEpsilon,
debugRun);

} else {
noisedResultSet =
conflateWithDomainAndAddNoise(
outputDomainLocation,
outputDomainShards,
aggregationEngine,
debugPrivacyEpsilon,
debugRun);
}
} catch (DomainReadException e) {
throw new AggregationJobProcessException(
INPUT_DATA_READ_FAILED, "Exception while reading domain input data.", e.getCause());
} catch (ExecutionException e) {
if (e.getCause() instanceof DomainReadException) {
throw new AggregationJobProcessException(
INPUT_DATA_READ_FAILED, "Exception while reading domain input data.", e.getCause());
}
throw new AggregationJobProcessException(
INTERNAL_ERROR, "Exception in processing domain.", e);
}

processingStopwatch.stop();
Expand Down Expand Up @@ -399,34 +381,6 @@ private NoisedAggregatedResultSet conflateWithDomainAndAddNoiseStreaming(
debugRun);
}

private NoisedAggregatedResultSet conflateWithDomainAndAddNoise(
Optional<DataLocation> outputDomainLocation,
ImmutableList<DataLocation> outputDomainShards,
AggregationEngine engine,
Optional<Double> debugPrivacyEpsilon,
Boolean debugRun)
throws DomainReadException, ExecutionException, InterruptedException {
@Var
ListenableFuture<ImmutableSet<BigInteger>> outputDomainFuture =
outputDomainLocation
.map(loc -> outputDomainProcessor.readAndDedupeDomain(loc, outputDomainShards))
.orElse(immediateFuture(ImmutableSet.of()));

ListenableFuture<NoisedAggregatedResultSet> aggregationFinalFuture =
Futures.transform(
outputDomainFuture,
outputDomain ->
outputDomainProcessor.adjustAggregationWithDomainAndNoise(
noisedAggregationRunner,
outputDomain,
engine.makeAggregation(),
debugPrivacyEpsilon,
debugRun),
nonBlockingThreadPool);

return aggregationFinalFuture.get();
}

private double getReportErrorThresholdPercentage(Map<String, String> jobParams) {
String jobParamsReportErrorThresholdPercentage =
jobParams.getOrDefault(JOB_PARAM_REPORT_ERROR_THRESHOLD_PERCENTAGE, null);
Expand All @@ -449,21 +403,34 @@ private void consumePrivacyBudgetUnits(ImmutableList<PrivacyBudgetUnit> budgetsT
return;
}

String claimedIdentity;
// Validations ensure that at least one of the parameters will always exist.
if (job.requestInfo().getJobParametersMap().containsKey(JOB_PARAM_REPORTING_SITE)) {
claimedIdentity = job.requestInfo().getJobParametersMap().get(JOB_PARAM_REPORTING_SITE);
} else {
try {
claimedIdentity =
ReportingOriginUtils.convertReportingOriginToSite(
job.requestInfo().getJobParametersMap().get(JOB_PARAM_ATTRIBUTION_REPORT_TO));
} catch (InvalidReportingOriginException e) {
// This should never happen due to validations ensuring that the reporting origin is always
// valid.
throw new IllegalStateException(
"Invalid reporting origin found while consuming budget, this should not happen as job"
+ " validations ensure the reporting origin is always valid.",
e);
}
}

ImmutableList<PrivacyBudgetUnit> missingPrivacyBudgetUnits;
try {
try (Timer t =
oTelConfiguration.createDebugTimerStarted("pbs_latency", toJobKeyString(job.jobKey()))) {
final String reportingOrigin =
job.requestInfo().getJobParametersMap().get(JOB_PARAM_ATTRIBUTION_REPORT_TO);
final String claimedIdentity =
ReportingOriginUtils.convertReportingOriginToSite(reportingOrigin);
missingPrivacyBudgetUnits =
privacyBudgetingServiceBridge.consumePrivacyBudget(budgetsToConsume, claimedIdentity);
} catch (InvalidReportingOriginException e) {
throw new AggregationJobProcessException(
INVALID_JOB,
"The attribution_report_to parameter specified in the CreateJob request is not under a"
+ " known public suffix.");
privacyBudgetingServiceBridge.consumePrivacyBudget(
budgetsToConsume, claimedIdentity);
}
} catch (PrivacyBudgetingServiceBridgeException e) {
if (e.getStatusCode() != null) {
Expand Down
Loading

0 comments on commit 86356cd

Please sign in to comment.