Skip to content

Commit

Permalink
JdkHttpSender should retry on connect exceptions (#5867)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Oct 23, 2023
1 parent 82a0e03 commit e06d35a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 31 deletions.
5 changes: 5 additions & 0 deletions exporters/sender/jdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ tasks {
options.release.set(11)
}
}

tasks.test {
val testJavaVersion: String? by project
enabled = !testJavaVersion.equals("8")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -56,19 +57,16 @@ public final class JdkHttpSender implements HttpSender {
private final Supplier<Map<String, String>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;

// Visible for testing
JdkHttpSender(
HttpClient client,
String endpoint,
boolean compressionEnabled,
String contentType,
long timeoutNanos,
Supplier<Map<String, String>> headerSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext) {
HttpClient.Builder builder = HttpClient.newBuilder().executor(executorService);
if (sslContext != null) {
builder.sslContext(sslContext);
}
this.client = builder.build();
@Nullable RetryPolicy retryPolicy) {
this.client = client;
try {
this.uri = new URI(endpoint);
} catch (URISyntaxException e) {
Expand All @@ -81,14 +79,52 @@ public final class JdkHttpSender implements HttpSender {
this.retryPolicy = retryPolicy;
}

JdkHttpSender(
String endpoint,
boolean compressionEnabled,
String contentType,
long timeoutNanos,
Supplier<Map<String, String>> headerSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext) {
this(
configureClient(sslContext),
endpoint,
compressionEnabled,
contentType,
timeoutNanos,
headerSupplier,
retryPolicy);
}

private static HttpClient configureClient(@Nullable SSLContext sslContext) {
HttpClient.Builder builder =
HttpClient.newBuilder()
// Aligned with OkHttpClient default connect timeout
// TODO (jack-berg): Consider making connect timeout configurable
.connectTimeout(Duration.ofSeconds(10));
if (sslContext != null) {
builder.sslContext(sslContext);
}
return builder.build();
}

@Override
public void send(
Consumer<OutputStream> marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
CompletableFuture<HttpResponse<byte[]>> unused =
CompletableFuture.supplyAsync(() -> sendInternal(marshaler), executorService)
CompletableFuture.supplyAsync(
() -> {
try {
return sendInternal(marshaler);
} catch (IOException e) {
throw new IllegalStateException(e);
}
},
executorService)
.whenComplete(
(httpResponse, throwable) -> {
if (throwable != null) {
Expand All @@ -99,7 +135,8 @@ public void send(
});
}

private HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) {
// Visible for testing
HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
Expand Down Expand Up @@ -129,46 +166,64 @@ private HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) {

long attempt = 0;
long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos();
HttpResponse<byte[]> httpResponse = null;
IOException exception = null;
do {
requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos)));
HttpResponse<byte[]> httpResponse = sendRequest(requestBuilder, byteBufferPool);
attempt++;
if (attempt >= retryPolicy.getMaxAttempts()
|| !retryableStatusCodes.contains(httpResponse.statusCode())) {
return httpResponse;
if (attempt > 0) {
// Compute and sleep for backoff
long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos);
nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier());
try {
TimeUnit.NANOSECONDS.sleep(backoffNanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break; // Break out and return response or throw
}
// If after sleeping we've exceeded timeoutNanos, break out and return response or throw
if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) {
break;
}
}

// Compute and sleep for backoff
long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos);
nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier());
attempt++;
requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos)));
try {
TimeUnit.NANOSECONDS.sleep(backoffNanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
httpResponse = sendRequest(requestBuilder, byteBufferPool);
} catch (IOException e) {
exception = e;
}
if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) {

if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) {
return httpResponse;
}
} while (true);
if (exception != null && !isRetryableException(exception)) {
throw exception;
}
} while (attempt < retryPolicy.getMaxAttempts());

if (httpResponse != null) {
return httpResponse;
}
throw exception;
}

private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) {
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
// TODO: is throwable retryable?
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
} finally {
byteBufferPool.resetPool();
}
}

private static boolean isRetryableException(IOException throwable) {
return throwable instanceof HttpTimeoutException;
}

private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream {
NoCopyByteArrayOutputStream() {
super(retryableStatusCodes.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.jdk.internal;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.time.Duration;
import java.util.Collections;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class JdkHttpSenderTest {

private final HttpClient realHttpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofMillis(10)).build();
@Mock private HttpClient mockHttpClient;
private JdkHttpSender sender;

@BeforeEach
void setup() throws IOException, InterruptedException {
// Can't directly spy on HttpClient for some reason, so create a real instance and a mock that
// delegates to the real thing
when(mockHttpClient.send(any(), any()))
.thenAnswer(
invocation ->
realHttpClient.send(invocation.getArgument(0), invocation.getArgument(1)));
sender =
new JdkHttpSender(
mockHttpClient,
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
RetryPolicy.builder()
.setMaxAttempts(2)
.setInitialBackoff(Duration.ofMillis(1))
.build());
}

@Test
void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException {
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
.isInstanceOf(HttpConnectTimeoutException.class);

verify(mockHttpClient, times(2)).send(any(), any());
}

@Test
void sendInternal_NonRetryableException() throws IOException, InterruptedException {
doThrow(new IOException("unknown error")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
.isInstanceOf(IOException.class)
.hasMessage("unknown error");

verify(mockHttpClient, times(1)).send(any(), any());
}

@Test
void defaultConnectTimeout() {
sender =
new JdkHttpSender(
"http://localhost", true, "text/plain", 1, Collections::emptyMap, null, null);

assertThat(sender)
.extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class)))
.satisfies(
httpClient ->
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}
}

0 comments on commit e06d35a

Please sign in to comment.