From 0c6a9765afe3e3e0ab06294af6933e9fdd36efb6 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Sun, 19 Nov 2023 12:00:18 -0800 Subject: [PATCH] Define repeatable exception types --- .../beam/io/requestresponse/Repeater.java | 40 ++++++-- .../UserCodeRemoteSystemException.java | 41 ++++++++ .../beam/io/requestresponse/RepeaterTest.java | 97 +++++++++++++------ 3 files changed, 137 insertions(+), 41 deletions(-) create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java index 1599080f405ac..4b80ed899d259 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java @@ -19,18 +19,32 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; import java.io.IOException; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** Repeats a method invocation when it encounters an error. */ public class Repeater { + /** + * {@link Set} of {@link UserCodeExecutionException}s that warrant repeating. A public modifier is + * applied to communicate to users of this class which {@link UserCodeExecutionException}s + * constitute warrant repeat execution. + */ + public static final Set> REPEATABLE_ERROR_TYPES = + ImmutableSet.of( + UserCodeRemoteSystemException.class, + UserCodeTimeoutException.class, + UserCodeQuotaException.class); + /** Instantiates a {@link Repeater}. */ public static Repeater of( ThrowableFunction throwableFunction, Sleeper sleeper, Integer limit) { @@ -51,8 +65,8 @@ private Repeater( /** * Applies the {@link InputT} to the {@link ThrowableFunction}. If the function throws an - * exception, repeats the invocation up to the limit. Throws the last exception, if the limit - * reached. + * exception that {@link #REPEATABLE_ERROR_TYPES} contains, repeats the invocation up to the + * limit, otherwise throws immediately. Throws the last exception, if the limit reached. */ public OutputT apply(InputT input) throws UserCodeExecutionException, InterruptedException { @MonotonicNonNull UserCodeExecutionException lastException = null; @@ -60,6 +74,9 @@ public OutputT apply(InputT input) throws UserCodeExecutionException, Interrupte try { return throwableFunction.apply(input); } catch (UserCodeExecutionException e) { + if (!REPEATABLE_ERROR_TYPES.contains(e.getClass())) { + throw e; + } lastException = e; sleeper.sleep(); } @@ -85,26 +102,31 @@ public interface Sleeper { } /** - * A {@link Sleeper} implementation that uses {@link ExponentialBackOff} to determine how long to - * pause execution. + * A {@link Sleeper} implementation that uses a {@link BackOff} to determine how long to pause + * execution. */ public static class DefaultSleeper implements Sleeper { public static DefaultSleeper of() { - return new DefaultSleeper(); + return of(new ExponentialBackOff()); } - private DefaultSleeper() {} + public static DefaultSleeper of(BackOff backOff) { + return new DefaultSleeper(backOff); + } + + private final BackOff backOff; - private final ExponentialBackOff exponentialBackOff = new ExponentialBackOff(); + private DefaultSleeper(BackOff backOff) { + this.backOff = backOff; + } @Override public void sleep() throws InterruptedException { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); try { Future future = - executorService.schedule( - () -> {}, exponentialBackOff.nextBackOffMillis(), TimeUnit.MILLISECONDS); + executorService.schedule(() -> {}, backOff.nextBackOffMillis(), TimeUnit.MILLISECONDS); future.get(); } catch (IOException | ExecutionException e) { throw new RuntimeException(e); diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java new file mode 100644 index 0000000000000..dac16344bec7b --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +/** + * A {@link UserCodeExecutionException} that signals an error with a remote system. Examples of such + * errors include an HTTP 5XX error or gRPC INTERNAL (13) error. + */ +public class UserCodeRemoteSystemException extends UserCodeExecutionException { + public UserCodeRemoteSystemException(String message) { + super(message); + } + + public UserCodeRemoteSystemException(String message, Throwable cause) { + super(message, cause); + } + + public UserCodeRemoteSystemException(Throwable cause) { + super(cause); + } + + public UserCodeRemoteSystemException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java index 4eb357b8fe464..ae05c67cd0423 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java @@ -17,9 +17,12 @@ */ package org.apache.beam.io.requestresponse; -import static org.junit.Assert.assertThrows; +import static org.apache.beam.io.requestresponse.Repeater.REPEATABLE_ERROR_TYPES; import static org.junit.Assert.assertTrue; +import java.lang.reflect.InvocationTargetException; +import java.util.Optional; + import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -28,7 +31,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -43,45 +45,38 @@ public class RepeaterTest { private static final int LIMIT = 3; @Test - public void givenErrorsWithinLimit_yieldsOutput() { - PCollectionTuple pct = - pipeline - .apply(Create.of(1)) - .apply( - ParDo.of(new DoFnWithRepeaters(new CallerImpl(1), new SetupTeardownImpl(1))) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG))); - - PAssert.that(pct.get(OUTPUT_TAG)).containsInAnyOrder(2); - PAssert.that(pct.get(FAILURE_TAG)).empty(); - pipeline.run(); - } - - @Test - public void givenSetupErrorsAtLimit_throws() { + public void givenCallerQuotaErrorsAtLimit_emitsIntoFailurePCollection() { PCollectionTuple pct = pipeline .apply(Create.of(1)) .apply( - ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(LIMIT))) + ParDo.of( + new DoFnWithRepeaters( + new CallerImpl(LIMIT, UserCodeQuotaException.class), + new SetupTeardownImpl(0))) .withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG))); PAssert.that(pct.get(OUTPUT_TAG)).empty(); - PAssert.that(pct.get(FAILURE_TAG)).empty(); - assertThrows(UncheckedExecutionException.class, pipeline::run); - } + PAssert.that(pct.get(FAILURE_TAG)) + .containsInAnyOrder(UserCodeQuotaException.class.getName()); + pipeline.run(); + } @Test - public void givenCallerErrorsAtLimit_throws() { + public void givenCallerTimeoutErrorsAtLimit_emitsIntoFailurePCollection() { PCollectionTuple pct = - pipeline - .apply(Create.of(1)) - .apply( - ParDo.of(new DoFnWithRepeaters(new CallerImpl(LIMIT), new SetupTeardownImpl(0))) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG))); + pipeline + .apply(Create.of(1)) + .apply( + ParDo.of( + new DoFnWithRepeaters( + new CallerImpl(LIMIT, UserCodeTimeoutException.class), + new SetupTeardownImpl(0))) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG))); PAssert.that(pct.get(OUTPUT_TAG)).empty(); PAssert.that(pct.get(FAILURE_TAG)) - .containsInAnyOrder(UserCodeExecutionException.class.getName()); + .containsInAnyOrder(UserCodeTimeoutException.class.getName()); pipeline.run(); } @@ -130,7 +125,7 @@ public void process(@Element Integer element, MultiOutputReceiver receiver) { try { receiver.get(OUTPUT_TAG).output(repeater.apply(element)); } catch (UserCodeExecutionException e) { - receiver.get(FAILURE_TAG).output(UserCodeExecutionException.class.getName()); + receiver.get(FAILURE_TAG).output(e.getMessage()); } catch (InterruptedException ignored) { } } @@ -139,16 +134,32 @@ public void process(@Element Integer element, MultiOutputReceiver receiver) { private static class CallerImpl implements Caller { private int wantNumErrors; + private final Class wantThrowWith; + private final String exceptionName; private CallerImpl(int wantNumErrors) { + this(wantNumErrors, UserCodeExecutionException.class); + } + + private CallerImpl( + int wantNumErrors, Class wantThrowWith) { this.wantNumErrors = wantNumErrors; + this.wantThrowWith = wantThrowWith; + this.exceptionName = getRepeatableErrorTypeName(wantThrowWith); } @Override public Integer call(Integer request) throws UserCodeExecutionException { wantNumErrors--; if (wantNumErrors > 0) { - throw new UserCodeExecutionException(""); + try { + throw wantThrowWith.getConstructor(String.class).newInstance(exceptionName); + } catch (InstantiationException + | NoSuchMethodException + | InvocationTargetException + | IllegalAccessException e) { + throw new RuntimeException(e); + } } return request * 2; } @@ -156,16 +167,29 @@ public Integer call(Integer request) throws UserCodeExecutionException { private static class SetupTeardownImpl implements SetupTeardown { private int wantNumErrors; + private final Class wantThrowWith; + private final String exceptionName; - public SetupTeardownImpl(int wantNumErrors) { + private SetupTeardownImpl(int wantNumErrors) { + this(wantNumErrors, UserCodeExecutionException.class); + } + + private SetupTeardownImpl( + int wantNumErrors, Class wantThrowWith) { this.wantNumErrors = wantNumErrors; + this.wantThrowWith = wantThrowWith; + this.exceptionName = getRepeatableErrorTypeName(wantThrowWith); } @Override public void setup() throws UserCodeExecutionException { wantNumErrors--; if (wantNumErrors > 0) { - throw new UserCodeExecutionException(""); + try { + throw wantThrowWith.getConstructor(String.class).newInstance(exceptionName); + } catch (InstantiationException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } } } @@ -185,4 +209,13 @@ public void sleep() throws InterruptedException { Thread.sleep(sleepFor); } } + + private static String getRepeatableErrorTypeName(Class e) { + for(Class ex : REPEATABLE_ERROR_TYPES) { + if (ex.equals(e)) { + return ex.getName(); + } + } + return e.getName(); + } }