Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RRIO] [Throttle] transform that slows down element transmission without an external resource #30123

Closed
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e0bc0b
WIP: Integer based restriction
damondouglas Jan 15, 2024
dd78db6
Gotcha splittable DoFn output when tryClaim false
damondouglas Jan 16, 2024
38207d8
Increase unit test coverage; implement metrics
damondouglas Jan 19, 2024
7b936c3
Cleanup tests and documentation
damondouglas Jan 19, 2024
0254e49
Add additional metrics
damondouglas Jan 19, 2024
9186cdd
Add throttle to rrio main class
damondouglas Jan 20, 2024
f86c307
Finalize Preventive Throttling
damondouglas Jan 23, 2024
0c7ae64
Patch code comments
damondouglas Feb 1, 2024
5fab667
ProcessContinuation is a problem
damondouglas Feb 3, 2024
19ca6cf
Refactor per PR comments
damondouglas Feb 4, 2024
01f3214
Fix typo
damondouglas Feb 4, 2024
89a5074
Detect error with unbounded GlobalWindow
damondouglas Feb 5, 2024
ea9d3e7
WIP: fix missing data
damondouglas Feb 5, 2024
5e3bbf2
Add error tolerance to time dependent tests
damondouglas Feb 5, 2024
2aa8a37
Fix streaming cases
damondouglas Feb 6, 2024
9d7435f
Add user custom type test
damondouglas Feb 6, 2024
dc322ea
Preserve the timestamp; Remove unnecessary code
damondouglas Feb 7, 2024
6b50102
Remove prior window tests
damondouglas Feb 7, 2024
119f628
Finished window assignment tests
damondouglas Feb 7, 2024
b7eb1e6
Simplify windowing tests
damondouglas Feb 11, 2024
658f213
Use combineFns to validate windows
damondouglas Feb 13, 2024
fc0bfa4
Remove global windowing in Throttle
damondouglas Feb 13, 2024
11e75e6
Implement Throttle Fn for bounded sources; defer unbounded for future PR
damondouglas Feb 21, 2024
b9923b6
Make Throttle package private
damondouglas Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.apache.tools.ant.taskdefs.ExecuteJava

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -32,10 +34,9 @@ var protobufVersion = "3.21.5"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.commons_math3
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation "redis.clients:jedis:$jedisVersion"

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.beam.io.requestresponse;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.util.Optional;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand All @@ -36,19 +32,18 @@
@AutoValue
public abstract class ApiIOError {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T}
* element is converted to a JSON string.
* element is converted to a string by calling {@link Object#toString()}.
*/
static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element)
throws JsonProcessingException {

String json = OBJECT_MAPPER.writeValueAsString(checkStateNotNull(element));
static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element) {
String request = "";
if (element != null) {
request = element.toString();
}

return ApiIOError.builder()
.setRequestAsJsonString(json)
.setRequestAsString(request)
.setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
.setObservedTimestamp(Instant.now())
.setStackTrace(Throwables.getStackTraceAsString(e))
Expand All @@ -59,8 +54,8 @@ static Builder builder() {
return new AutoValue_ApiIOError.Builder();
}

/** The JSON string representation of the request associated with the error. */
public abstract String getRequestAsJsonString();
/** The {@link Object#toString()} representation of the request associated with the error. */
public abstract String getRequestAsString();

/** The observed timestamp of the error. */
public abstract Instant getObservedTimestamp();
Expand All @@ -74,7 +69,7 @@ static Builder builder() {
@AutoValue.Builder
abstract static class Builder {

abstract Builder setRequestAsJsonString(String value);
abstract Builder setRequestAsString(String value);

abstract Builder setObservedTimestamp(Instant value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -302,8 +301,7 @@ public void teardown() throws UserCodeExecutionException {
}

@ProcessElement
public void process(@Element RequestT request, MultiOutputReceiver receiver)
throws JsonProcessingException {
public void process(@Element RequestT request, MultiOutputReceiver receiver) {

BackOff backOff = configuration.getBackOffSupplier().get();
Sleeper sleeper = configuration.getSleeperSupplier().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ public static Builder builder() {
/** Count {@link Cache} write failures. */
public abstract Boolean getCountCacheWriteFailures();

public static Monitoring defaultInstance() {
return Monitoring.builder().build();
}

/**
* Turns on all monitoring. The purpose of this method is, when used with {@link #toBuilder} and
* other setters, to turn everything on except for a few select counters.
Expand Down Expand Up @@ -237,6 +241,16 @@ static void incIfPresent(@Nullable Counter counter) {
}
}

/**
* Like {@link #incIfPresent(Counter)} but allows for incrementing by a value specified as an
* argument.
*/
static void incIfPresent(@Nullable Counter counter, int by) {
if (counter != null) {
counter.inc(by);
}
}

@AutoValue.Builder
public abstract static class Builder {

Expand Down Expand Up @@ -306,7 +320,7 @@ public abstract static class Builder {

abstract Monitoring autoBuild();

final Monitoring build() {
public final Monitoring build() {
if (!getCountRequests().isPresent()) {
setCountRequests(false);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.io.Serializable;
import org.joda.time.Duration;

/** Configures the number of elements at most emitted within a time interval. */
public class Rate implements Serializable {

/**
* Instantiate a {@link Rate} with numElements and {@link Duration} interval. The arguments
* numElements and interval's {@link Duration#getMillis()} must be > 0.
*/
public static Rate of(int numElements, Duration interval) {
return new Rate(numElements, interval);
}

private final int numElements;
private final Duration interval;

private Rate(int numElements, Duration interval) {
checkArgument(numElements > 0);
checkArgument(interval.getMillis() > 0);
this.numElements = numElements;
this.interval = interval;
}

/** Gets the number of elements to emit within {@link #getInterval()}. */
public int getNumElements() {
return numElements;
}

/** Gets the {@link Duration} to emit {@link #getNumElements()}. */
public Duration getInterval() {
return interval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,39 @@
* result.getResponses().apply( ... );
* result.getFailures().apply( ... );
* }</pre>
*
* <h2>API quota usage strategies</h2>
*
* Many Web APIs limit usage to prevent abuse rendering consumption via parallelized workloads such
* as Beam problematic. This package provides several strategies to manage API quota usage. By
* default, {@link RequestResponseIO} employs adaptive throttling and request repeating. Additional
* options are discussed below.
*
* <h3>Throttling</h3>
*
* Users may wish to throttle the {@link RequestT} {@link PCollection} before applying {@link
* RequestResponseIO} as shown by example below. See {@link Throttle} for more details.
*
* <pre>{@code
* Rate rate = Rate.of(10, Duration.standardSeconds(1L));
* PCollection<RequestT> original ...
* PCollection<RequestT> throttled = original.apply(Throttle.of(rate));
* Result result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));
* }</pre>
*
* <h3>Caching</h3>
*
* Users may optionally enable caching or associated {@link RequestT}s and {@link ResponseT}s to
* reduce the burden of API usage as shown below. For cache support, see {@link Cache#usingRedis}
* and {@link RequestResponseIO#withCache} for more details.
*
* <pre>{@code
* requests.apply(
* RequestResponseIO
* .of(new MyCaller(), responseCoder)
* .withCache(Cache.usingRedis(uri, requestCoder, responseCoder, expiry))
* )
* }</pre>
*/
public class RequestResponseIO<RequestT, ResponseT>
extends PTransform<PCollection<RequestT>, Result<ResponseT>> {
Expand Down Expand Up @@ -256,23 +289,12 @@ public RequestResponseIO<RequestT, ResponseT> withCache(Cache.Pair<RequestT, Res
callConfiguration);
}

/** Configures the transform with {@link Monitoring}, turned off by default. */
public RequestResponseIO<RequestT, ResponseT> withMonitoringConfiguration(Monitoring value) {
return new RequestResponseIO<>(
rrioConfiguration, callConfiguration.toBuilder().setMonitoringConfiguration(value).build());
}

/**
* Configures {@link RequestResponseIO} with a {@link PTransform} that holds back {@link
* RequestT}s to prevent quota errors such as HTTP 429 or gRPC RESOURCE_EXHAUSTION errors.
*/
// TODO(damondouglas): Until https://github.com/apache/beam/issues/28930 there is no provided
// solution for this, however this method allows users to provide their own at this time.
public RequestResponseIO<RequestT, ResponseT> withPreventiveThrottle(
PTransform<PCollection<RequestT>, Result<RequestT>> throttle) {
return new RequestResponseIO<>(
rrioConfiguration.toBuilder().setThrottle(throttle).build(), callConfiguration);
}

/** Exposes the transform's {@link Call.Configuration} for testing. */
@VisibleForTesting
Call.Configuration<RequestT, ResponseT> getCallConfiguration() {
Expand Down Expand Up @@ -351,7 +373,7 @@ public Result<ResponseT> expand(PCollection<RequestT> input) {

// Throttle the RequestT input PCollection.
Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> throttle =
expandThrottle(input, failureList);
expandPreventiveThrottle(input, failureList);
input = throttle.getLeft();
failureList = throttle.getRight();

Expand Down Expand Up @@ -434,9 +456,7 @@ public Result<ResponseT> expand(PCollection<RequestT> input) {
* <li>Returns appended {@link PCollection} of {@link ApiIOError}s to the failureList.</li>
* </ol></pre>
*/
// TODO(damondouglas): See https://github.com/apache/beam/issues/28930; currently there is no
// provided solution for this, though users could provide their own via withThrottle.
Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> expandThrottle(
Pair<PCollection<RequestT>, PCollectionList<ApiIOError>> expandPreventiveThrottle(
PCollection<RequestT> input, PCollectionList<ApiIOError> failureList) {

if (rrioConfiguration.getThrottle() == null) {
Expand Down
Loading
Loading