From 910c7cc218f2edca8cca24615a353ed168015fbd Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Fri, 9 Aug 2024 09:39:37 -0500 Subject: [PATCH] Retry ConnectException, add retry logging (#6614) --- .../sender/jdk/internal/JdkHttpSender.java | 52 ++++++++++++-- .../jdk/internal/JdkHttpSenderTest.java | 44 +++++++++++- .../okhttp/internal/RetryInterceptor.java | 70 ++++++++++++++++--- .../okhttp/internal/RetryInterceptorTest.java | 30 ++++++++ 4 files changed, 180 insertions(+), 16 deletions(-) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 22c0609e53d..c79aaf6162f 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,6 +5,8 @@ package io.opentelemetry.exporter.sender.jdk.internal; +import static java.util.stream.Collectors.joining; + import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.exporter.internal.marshal.Marshaler; @@ -25,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -33,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -52,6 +57,8 @@ public final class JdkHttpSender implements HttpSender { private static final ThreadLocal threadLocalByteBufPool = ThreadLocal.withInitial(ByteBufferPool::new); + private static final Logger logger = Logger.getLogger(JdkHttpSender.class.getName()); + private final ExecutorService executorService = Executors.newFixedThreadPool(5); private final HttpClient client; private final URI uri; @@ -211,11 +218,37 @@ HttpResponse sendInternal(Marshaler marshaler) throws IOException { exception = e; } - if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) { - return httpResponse; + if (httpResponse != null) { + boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode()); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " returned " + + (retryable ? "retryable" : "non-retryable") + + " response: " + + responseStringRepresentation(httpResponse)); + } + if (!retryable) { + return httpResponse; + } } - if (exception != null && !isRetryableException(exception)) { - throw exception; + if (exception != null) { + boolean retryable = isRetryableException(exception); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " failed with " + + (retryable ? "retryable" : "non-retryable") + + " exception", + exception); + } + if (!retryable) { + throw exception; + } } } while (attempt < retryPolicy.getMaxAttempts()); @@ -225,6 +258,17 @@ HttpResponse sendInternal(Marshaler marshaler) throws IOException { throw exception; } + private static String responseStringRepresentation(HttpResponse response) { + StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}"); + joiner.add("code=" + response.statusCode()); + joiner.add( + "headers=" + + response.headers().map().entrySet().stream() + .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) + .collect(joining(",", "[", "]"))); + return joiner.toString(); + } + private void write(Marshaler marshaler, OutputStream os) throws IOException { if (exportAsJson) { marshaler.writeJsonTo(os); diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 79a06521c5b..5df723b6560 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -18,6 +18,8 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; +import java.net.ConnectException; +import java.net.ServerSocket; import java.net.http.HttpClient; import java.net.http.HttpConnectTimeoutException; import java.time.Duration; @@ -53,8 +55,8 @@ void setup() throws IOException, InterruptedException { sender = new JdkHttpSender( mockHttpClient, - "http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection - // timeout + // Connecting to a non-routable IP address to trigger connection timeout + "http://10.255.255.1", null, false, "text/plain", @@ -74,6 +76,44 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru verify(mockHttpClient, times(2)).send(any(), any()); } + @Test + void sendInternal_RetryableConnectException() throws IOException, InterruptedException { + sender = + new JdkHttpSender( + mockHttpClient, + // Connecting to localhost on an unused port address to trigger + // java.net.ConnectException (or java.net.http.HttpConnectTimeoutException on linux java + // 11+) + "http://localhost:" + freePort(), + null, + false, + "text/plain", + Duration.ofSeconds(10).toNanos(), + Collections::emptyMap, + RetryPolicy.builder() + .setMaxAttempts(2) + .setInitialBackoff(Duration.ofMillis(1)) + .build()); + + assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler())) + .satisfies( + e -> + assertThat( + (e instanceof ConnectException) + || (e instanceof HttpConnectTimeoutException)) + .isTrue()); + + verify(mockHttpClient, times(2)).send(any(), any()); + } + + private static int freePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Test void sendInternal_RetryableIoException() throws IOException, InterruptedException { doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any()); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java index ee7d5fc9177..405c54f1945 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java @@ -5,13 +5,19 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import static java.util.stream.Collectors.joining; + import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.Locale; +import java.util.StringJoiner; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; import okhttp3.Interceptor; import okhttp3.Response; @@ -23,6 +29,8 @@ */ public final class RetryInterceptor implements Interceptor { + private static final Logger logger = Logger.getLogger(RetryInterceptor.class.getName()); + private final RetryPolicy retryPolicy; private final Function isRetryable; private final Function isRetryableException; @@ -84,12 +92,39 @@ public Response intercept(Chain chain) throws IOException { } catch (IOException e) { exception = e; } - if (response != null && !Boolean.TRUE.equals(isRetryable.apply(response))) { - return response; + if (response != null) { + boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response)); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " returned " + + (retryable ? "retryable" : "non-retryable") + + " response: " + + responseStringRepresentation(response)); + } + if (!retryable) { + return response; + } } - if (exception != null && !Boolean.TRUE.equals(isRetryableException.apply(exception))) { - throw exception; + if (exception != null) { + boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception)); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " failed with " + + (retryable ? "retryable" : "non-retryable") + + " exception", + exception); + } + if (!retryable) { + throw exception; + } } + } while (attempt < retryPolicy.getMaxAttempts()); if (response != null) { @@ -98,15 +133,30 @@ public Response intercept(Chain chain) throws IOException { throw exception; } + private static String responseStringRepresentation(Response response) { + StringJoiner joiner = new StringJoiner(",", "Response{", "}"); + joiner.add("code=" + response.code()); + joiner.add( + "headers=" + + response.headers().toMultimap().entrySet().stream() + .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) + .collect(joining(",", "[", "]"))); + return joiner.toString(); + } + // Visible for testing static boolean isRetryableException(IOException e) { - if (!(e instanceof SocketTimeoutException)) { - return false; + if (e instanceof SocketTimeoutException) { + String message = e.getMessage(); + // Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect + // timed out" + return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out"); + } else if (e instanceof ConnectException) { + // Exceptions resemble: java.net.ConnectException: Failed to connect to + // localhost/[0:0:0:0:0:0:0:1]:62611 + return true; } - String message = e.getMessage(); - // Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect timed - // out" - return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out"); + return false; } // Visible for testing diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java index f4b644a36f3..1a6656c6217 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java @@ -23,6 +23,8 @@ import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; +import java.net.ConnectException; +import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -157,6 +159,34 @@ void connectTimeout() throws Exception { verify(sleeper, times(4)).sleep(anyLong()); } + @Test + void connectException() throws Exception { + client = connectTimeoutClient(); + when(random.get(anyLong())).thenReturn(1L); + doNothing().when(sleeper).sleep(anyLong()); + + // Connecting to localhost on an unused port address to trigger java.net.ConnectException + int openPort = freePort(); + assertThatThrownBy( + () -> + client + .newCall(new Request.Builder().url("http://localhost:" + openPort).build()) + .execute()) + .isInstanceOf(ConnectException.class); + + verify(isRetryableException, times(5)).apply(any()); + // Should retry maxAttempts, and sleep maxAttempts - 1 times + verify(sleeper, times(4)).sleep(anyLong()); + } + + private static int freePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Test void nonRetryableException() throws InterruptedException { client = connectTimeoutClient();