Skip to content

Commit

Permalink
Define repeatable exception types
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas committed Nov 22, 2023
1 parent 520cf79 commit 0c6a976
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,32 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

/** Repeats a method invocation when it encounters an error. */
public class Repeater<InputT, OutputT> {

/**
* {@link Set} of {@link UserCodeExecutionException}s that warrant repeating. A public modifier is
* applied to communicate to users of this class which {@link UserCodeExecutionException}s
* constitute warrant repeat execution.
*/
public static final Set<Class<? extends UserCodeExecutionException>> REPEATABLE_ERROR_TYPES =
ImmutableSet.of(
UserCodeRemoteSystemException.class,
UserCodeTimeoutException.class,
UserCodeQuotaException.class);

/** Instantiates a {@link Repeater}. */
public static <InputT, OutputT> Repeater<InputT, OutputT> of(
ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, Integer limit) {
Expand All @@ -51,15 +65,18 @@ private Repeater(

/**
* Applies the {@link InputT} to the {@link ThrowableFunction}. If the function throws an
* exception, repeats the invocation up to the limit. Throws the last exception, if the limit
* reached.
* exception that {@link #REPEATABLE_ERROR_TYPES} contains, repeats the invocation up to the
* limit, otherwise throws immediately. Throws the last exception, if the limit reached.
*/
public OutputT apply(InputT input) throws UserCodeExecutionException, InterruptedException {
@MonotonicNonNull UserCodeExecutionException lastException = null;
for (int i = 0; i < limit - 1; i++) {
try {
return throwableFunction.apply(input);
} catch (UserCodeExecutionException e) {
if (!REPEATABLE_ERROR_TYPES.contains(e.getClass())) {
throw e;
}
lastException = e;
sleeper.sleep();
}
Expand All @@ -85,26 +102,31 @@ public interface Sleeper {
}

/**
* A {@link Sleeper} implementation that uses {@link ExponentialBackOff} to determine how long to
* pause execution.
* A {@link Sleeper} implementation that uses a {@link BackOff} to determine how long to pause
* execution.
*/
public static class DefaultSleeper implements Sleeper {

public static DefaultSleeper of() {
return new DefaultSleeper();
return of(new ExponentialBackOff());
}

private DefaultSleeper() {}
public static DefaultSleeper of(BackOff backOff) {
return new DefaultSleeper(backOff);
}

private final BackOff backOff;

private final ExponentialBackOff exponentialBackOff = new ExponentialBackOff();
private DefaultSleeper(BackOff backOff) {
this.backOff = backOff;
}

@Override
public void sleep() throws InterruptedException {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
try {
Future<?> future =
executorService.schedule(
() -> {}, exponentialBackOff.nextBackOffMillis(), TimeUnit.MILLISECONDS);
executorService.schedule(() -> {}, backOff.nextBackOffMillis(), TimeUnit.MILLISECONDS);
future.get();
} catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

/**
* A {@link UserCodeExecutionException} that signals an error with a remote system. Examples of such
* errors include an HTTP 5XX error or gRPC INTERNAL (13) error.
*/
public class UserCodeRemoteSystemException extends UserCodeExecutionException {
public UserCodeRemoteSystemException(String message) {
super(message);
}

public UserCodeRemoteSystemException(String message, Throwable cause) {
super(message, cause);
}

public UserCodeRemoteSystemException(Throwable cause) {
super(cause);
}

public UserCodeRemoteSystemException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.beam.io.requestresponse;

import static org.junit.Assert.assertThrows;
import static org.apache.beam.io.requestresponse.Repeater.REPEATABLE_ERROR_TYPES;
import static org.junit.Assert.assertTrue;

import java.lang.reflect.InvocationTargetException;
import java.util.Optional;

import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand All @@ -28,7 +31,6 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
Expand All @@ -43,45 +45,38 @@ public class RepeaterTest {
private static final int LIMIT = 3;

@Test
public void givenErrorsWithinLimit_yieldsOutput() {
PCollectionTuple pct =
pipeline
.apply(Create.of(1))
.apply(
ParDo.of(new DoFnWithRepeaters(new CallerImpl(1), new SetupTeardownImpl(1)))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));

PAssert.that(pct.get(OUTPUT_TAG)).containsInAnyOrder(2);
PAssert.that(pct.get(FAILURE_TAG)).empty();
pipeline.run();
}

@Test
public void givenSetupErrorsAtLimit_throws() {
public void givenCallerQuotaErrorsAtLimit_emitsIntoFailurePCollection() {
PCollectionTuple pct =
pipeline
.apply(Create.of(1))
.apply(
ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(LIMIT)))
ParDo.of(
new DoFnWithRepeaters(
new CallerImpl(LIMIT, UserCodeQuotaException.class),
new SetupTeardownImpl(0)))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));

PAssert.that(pct.get(OUTPUT_TAG)).empty();
PAssert.that(pct.get(FAILURE_TAG)).empty();
assertThrows(UncheckedExecutionException.class, pipeline::run);
}
PAssert.that(pct.get(FAILURE_TAG))
.containsInAnyOrder(UserCodeQuotaException.class.getName());

pipeline.run();
}
@Test
public void givenCallerErrorsAtLimit_throws() {
public void givenCallerTimeoutErrorsAtLimit_emitsIntoFailurePCollection() {
PCollectionTuple pct =
pipeline
.apply(Create.of(1))
.apply(
ParDo.of(new DoFnWithRepeaters(new CallerImpl(LIMIT), new SetupTeardownImpl(0)))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
pipeline
.apply(Create.of(1))
.apply(
ParDo.of(
new DoFnWithRepeaters(
new CallerImpl(LIMIT, UserCodeTimeoutException.class),
new SetupTeardownImpl(0)))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));

PAssert.that(pct.get(OUTPUT_TAG)).empty();
PAssert.that(pct.get(FAILURE_TAG))
.containsInAnyOrder(UserCodeExecutionException.class.getName());
.containsInAnyOrder(UserCodeTimeoutException.class.getName());

pipeline.run();
}
Expand Down Expand Up @@ -130,7 +125,7 @@ public void process(@Element Integer element, MultiOutputReceiver receiver) {
try {
receiver.get(OUTPUT_TAG).output(repeater.apply(element));
} catch (UserCodeExecutionException e) {
receiver.get(FAILURE_TAG).output(UserCodeExecutionException.class.getName());
receiver.get(FAILURE_TAG).output(e.getMessage());
} catch (InterruptedException ignored) {
}
}
Expand All @@ -139,33 +134,62 @@ public void process(@Element Integer element, MultiOutputReceiver receiver) {
private static class CallerImpl implements Caller<Integer, Integer> {

private int wantNumErrors;
private final Class<? extends UserCodeExecutionException> wantThrowWith;
private final String exceptionName;

private CallerImpl(int wantNumErrors) {
this(wantNumErrors, UserCodeExecutionException.class);
}

private CallerImpl(
int wantNumErrors, Class<? extends UserCodeExecutionException> wantThrowWith) {
this.wantNumErrors = wantNumErrors;
this.wantThrowWith = wantThrowWith;
this.exceptionName = getRepeatableErrorTypeName(wantThrowWith);
}

@Override
public Integer call(Integer request) throws UserCodeExecutionException {
wantNumErrors--;
if (wantNumErrors > 0) {
throw new UserCodeExecutionException("");
try {
throw wantThrowWith.getConstructor(String.class).newInstance(exceptionName);
} catch (InstantiationException
| NoSuchMethodException
| InvocationTargetException
| IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return request * 2;
}
}

private static class SetupTeardownImpl implements SetupTeardown {
private int wantNumErrors;
private final Class<? extends UserCodeExecutionException> wantThrowWith;
private final String exceptionName;

public SetupTeardownImpl(int wantNumErrors) {
private SetupTeardownImpl(int wantNumErrors) {
this(wantNumErrors, UserCodeExecutionException.class);
}

private SetupTeardownImpl(
int wantNumErrors, Class<? extends UserCodeExecutionException> wantThrowWith) {
this.wantNumErrors = wantNumErrors;
this.wantThrowWith = wantThrowWith;
this.exceptionName = getRepeatableErrorTypeName(wantThrowWith);
}

@Override
public void setup() throws UserCodeExecutionException {
wantNumErrors--;
if (wantNumErrors > 0) {
throw new UserCodeExecutionException("");
try {
throw wantThrowWith.getConstructor(String.class).newInstance(exceptionName);
} catch (InstantiationException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}

Expand All @@ -185,4 +209,13 @@ public void sleep() throws InterruptedException {
Thread.sleep(sleepFor);
}
}

private static String getRepeatableErrorTypeName(Class<? extends UserCodeExecutionException> e) {
for(Class<? extends UserCodeExecutionException> ex : REPEATABLE_ERROR_TYPES) {
if (ex.equals(e)) {
return ex.getName();
}
}
return e.getName();
}
}

0 comments on commit 0c6a976

Please sign in to comment.