Skip to content

Commit

Permalink
feat: OkHttp4Engine (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
lauzadis authored Sep 13, 2024
1 parent be6b82d commit cbf98f7
Show file tree
Hide file tree
Showing 19 changed files with 454 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changes/7acc674e-0dfe-461a-a5c3-f946e14a3ec7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": "7acc674e-0dfe-461a-a5c3-f946e14a3ec7",
"type": "feature",
"description": "Add OkHttp4Engine, an HTTP engine which uses okhttp3:4.x"
}
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ aws-kotlin-repo-tools-version = "0.4.10"
coroutines-version = "1.8.1"
atomicfu-version = "0.24.0"
okhttp-version = "5.0.0-alpha.14"
okhttp4-version = "4.12.0"
okio-version = "3.9.0"
otel-version = "1.32.0"
slf4j-version = "2.0.9"
Expand Down Expand Up @@ -51,6 +52,7 @@ kotlinx-coroutines-slf4j = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-

okio = { module = "com.squareup.okio:okio", version.ref = "okio-version" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp-version" }
okhttp4 = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp4-version" }
okhttp-coroutines = { module = "com.squareup.okhttp3:okhttp-coroutines", version.ref = "okhttp-version" }
opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "otel-version" }
opentelemetry-sdk-testing = {module = "io.opentelemetry:opentelemetry-sdk-testing", version.ref = "otel-version" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/HttpEngineEventListener : okhttp3/EventListener {
public fun <init> (Lokhttp3/ConnectionPool;Laws/smithy/kotlin/runtime/net/HostResolver;Lokhttp3/Dispatcher;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;Lokhttp3/Call;)V
public fun cacheConditionalHit (Lokhttp3/Call;Lokhttp3/Response;)V
public fun cacheHit (Lokhttp3/Call;Lokhttp3/Response;)V
public fun cacheMiss (Lokhttp3/Call;)V
public fun callEnd (Lokhttp3/Call;)V
public fun callFailed (Lokhttp3/Call;Ljava/io/IOException;)V
public fun callStart (Lokhttp3/Call;)V
public fun canceled (Lokhttp3/Call;)V
public fun connectEnd (Lokhttp3/Call;Ljava/net/InetSocketAddress;Ljava/net/Proxy;Lokhttp3/Protocol;)V
public fun connectFailed (Lokhttp3/Call;Ljava/net/InetSocketAddress;Ljava/net/Proxy;Lokhttp3/Protocol;Ljava/io/IOException;)V
public fun connectStart (Lokhttp3/Call;Ljava/net/InetSocketAddress;Ljava/net/Proxy;)V
public fun connectionAcquired (Lokhttp3/Call;Lokhttp3/Connection;)V
public fun connectionReleased (Lokhttp3/Call;Lokhttp3/Connection;)V
public fun dnsEnd (Lokhttp3/Call;Ljava/lang/String;Ljava/util/List;)V
public fun dnsStart (Lokhttp3/Call;Ljava/lang/String;)V
public fun proxySelectEnd (Lokhttp3/Call;Lokhttp3/HttpUrl;Ljava/util/List;)V
public fun proxySelectStart (Lokhttp3/Call;Lokhttp3/HttpUrl;)V
public fun requestBodyEnd (Lokhttp3/Call;J)V
public fun requestBodyStart (Lokhttp3/Call;)V
public fun requestFailed (Lokhttp3/Call;Ljava/io/IOException;)V
public fun requestHeadersEnd (Lokhttp3/Call;Lokhttp3/Request;)V
public fun requestHeadersStart (Lokhttp3/Call;)V
public fun responseBodyEnd (Lokhttp3/Call;J)V
public fun responseBodyStart (Lokhttp3/Call;)V
public fun responseFailed (Lokhttp3/Call;Ljava/io/IOException;)V
public fun responseHeadersEnd (Lokhttp3/Call;Lokhttp3/Response;)V
public fun responseHeadersStart (Lokhttp3/Call;)V
public fun satisfactionFailure (Lokhttp3/Call;Lokhttp3/Response;)V
public fun secureConnectEnd (Lokhttp3/Call;Lokhttp3/Handshake;)V
public fun secureConnectStart (Lokhttp3/Call;)V
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/MetricsInterceptor : okhttp3/Interceptor {
public static final field INSTANCE Laws/smithy/kotlin/runtime/http/engine/okhttp/MetricsInterceptor;
public fun intercept (Lokhttp3/Interceptor$Chain;)Lokhttp3/Response;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpCall : aws/smithy/kotlin/runtime/http/HttpCall {
public fun <init> (Laws/smithy/kotlin/runtime/http/request/HttpRequest;Laws/smithy/kotlin/runtime/http/response/HttpResponse;Laws/smithy/kotlin/runtime/time/Instant;Laws/smithy/kotlin/runtime/time/Instant;Lkotlin/coroutines/CoroutineContext;Lokhttp3/Call;)V
public synthetic fun <init> (Laws/smithy/kotlin/runtime/http/request/HttpRequest;Laws/smithy/kotlin/runtime/http/response/HttpResponse;Laws/smithy/kotlin/runtime/time/Instant;Laws/smithy/kotlin/runtime/time/Instant;Lkotlin/coroutines/CoroutineContext;Lokhttp3/Call;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun cancelInFlight ()V
public fun copy (Laws/smithy/kotlin/runtime/http/request/HttpRequest;Laws/smithy/kotlin/runtime/http/response/HttpResponse;)Laws/smithy/kotlin/runtime/http/HttpCall;
public final fun getCall ()Lokhttp3/Call;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpDns : okhttp3/Dns {
public fun <init> (Laws/smithy/kotlin/runtime/net/HostResolver;)V
public fun lookup (Ljava/lang/String;)Ljava/util/List;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineBase {
public static final field Companion Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine$Companion;
public fun <init> ()V
Expand Down Expand Up @@ -32,3 +83,51 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConf
public final fun invoke (Lkotlin/jvm/functions/Function1;)Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineKt {
public static final fun buildClient (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;)Lokhttp3/OkHttpClient;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpHeadersAdapter : aws/smithy/kotlin/runtime/http/Headers {
public fun <init> (Lokhttp3/Headers;)V
public fun contains (Ljava/lang/String;)Z
public synthetic fun contains (Ljava/lang/String;Ljava/lang/Object;)Z
public fun contains (Ljava/lang/String;Ljava/lang/String;)Z
public fun entries ()Ljava/util/Set;
public fun forEach (Lkotlin/jvm/functions/Function2;)V
public synthetic fun get (Ljava/lang/String;)Ljava/lang/Object;
public fun get (Ljava/lang/String;)Ljava/lang/String;
public fun getAll (Ljava/lang/String;)Ljava/util/List;
public fun getCaseInsensitiveName ()Z
public fun isEmpty ()Z
public fun names ()Ljava/util/Set;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpProxyAuthenticator : okhttp3/Authenticator {
public fun <init> (Laws/smithy/kotlin/runtime/http/engine/ProxySelector;)V
public fun authenticate (Lokhttp3/Route;Lokhttp3/Response;)Lokhttp3/Request;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpProxySelector : java/net/ProxySelector {
public fun <init> (Laws/smithy/kotlin/runtime/http/engine/ProxySelector;)V
public fun connectFailed (Ljava/net/URI;Ljava/net/SocketAddress;Ljava/io/IOException;)V
public fun select (Ljava/net/URI;)Ljava/util/List;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpUtilsKt {
public static final fun errCode (Ljava/lang/Exception;)Laws/smithy/kotlin/runtime/http/HttpErrorCode;
public static final fun mapOkHttpExceptions (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
public static final fun toOkHttpHeaders (Laws/smithy/kotlin/runtime/http/Headers;)Lokhttp3/Headers;
public static final fun toOkHttpRequest (Laws/smithy/kotlin/runtime/http/request/HttpRequest;Laws/smithy/kotlin/runtime/operation/ExecutionContext;Lkotlin/coroutines/CoroutineContext;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;)Lokhttp3/Request;
public static final fun toSdkResponse (Lokhttp3/Response;)Laws/smithy/kotlin/runtime/http/response/HttpResponse;
public static final fun toUrl (Ljava/net/URI;)Laws/smithy/kotlin/runtime/net/url/Url;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/StreamingRequestBody : okhttp3/RequestBody {
public fun <init> (Laws/smithy/kotlin/runtime/http/HttpBody;Lkotlin/coroutines/CoroutineContext;)V
public fun contentLength ()J
public fun contentType ()Lokhttp3/MediaType;
public fun isDuplex ()Z
public fun isOneShot ()Z
public fun writeTo (Lokio/BufferedSink;)V
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.ExperimentalApi
import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.http.engine.EngineAttributes
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.net.HostResolver
Expand All @@ -31,19 +32,20 @@ internal const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.okht

// see https://square.github.io/okhttp/features/events/#eventlistener for example callback flow
@OptIn(ExperimentalApi::class)
internal class HttpEngineEventListener(
@InternalApi
public class HttpEngineEventListener(
private val pool: ConnectionPool,
private val hr: HostResolver,
private val dispatcher: Dispatcher,
private val metrics: HttpClientMetrics,
call: Call,
) : EventListener() {
private val provider: TelemetryProvider = call.request().tag<SdkRequestTag>()?.callContext?.telemetryProvider ?: TelemetryProvider.None
private val provider: TelemetryProvider = call.request().tag(SdkRequestTag::class.java)?.callContext?.telemetryProvider ?: TelemetryProvider.None
private val traceSpan = provider.tracerProvider
.getOrCreateTracer(TELEMETRY_SCOPE)
.createSpan("HTTP")

private val logger = call.request().tag<SdkRequestTag>()?.callContext?.logger<OkHttpEngine>() ?: LoggerProvider.None.getLogger<OkHttpEngine>()
private val logger = call.request().tag(SdkRequestTag::class.java)?.callContext?.logger<OkHttpEngine>() ?: LoggerProvider.None.getLogger<OkHttpEngine>()

// callStart() is invoked immediately when enqueued, next success phase is either dnsStart() or connectionAcquired()
// see https://github.com/square/okhttp/blob/7c92ed0879477eddb2fce6b4066d151525d5687f/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt#L167-L175
Expand Down Expand Up @@ -84,22 +86,22 @@ internal class HttpEngineEventListener(
trace { "dns query: domain=$domainName" }
}

override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>) =
override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>): Unit =
trace { "dns resolved: domain=$domainName; records=$inetAddressList" }

override fun proxySelectStart(call: Call, url: HttpUrl) = trace { "proxy select start: url=$url" }
override fun proxySelectStart(call: Call, url: HttpUrl): Unit = trace { "proxy select start: url=$url" }

override fun proxySelectEnd(call: Call, url: HttpUrl, proxies: List<Proxy>) =
override fun proxySelectEnd(call: Call, url: HttpUrl, proxies: List<Proxy>): Unit =
trace { "proxy select end: url=$url; proxies=$proxies" }

override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy) =
override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy): Unit =
trace { "starting connection: addr=$inetSocketAddress; proxy=$proxy" }

override fun secureConnectStart(call: Call) = trace { "initiating TLS connection" }
override fun secureConnectStart(call: Call): Unit = trace { "initiating TLS connection" }

override fun secureConnectEnd(call: Call, handshake: Handshake?) = trace { "TLS connect end: handshake=$handshake" }
override fun secureConnectEnd(call: Call, handshake: Handshake?): Unit = trace { "TLS connect end: handshake=$handshake" }

override fun connectEnd(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?) =
override fun connectEnd(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?): Unit =
trace { "connection established: addr=$inetSocketAddress; proxy=$proxy; protocol=$protocol" }

override fun connectFailed(
Expand Down Expand Up @@ -139,7 +141,7 @@ internal class HttpEngineEventListener(
trace { "connection acquired: conn(id=$connId)=$connection; connPool: total=${pool.connectionCount()}, idle=${pool.idleConnectionCount()}" }
}

override fun requestHeadersStart(call: Call) = trace { "sending request headers" }
override fun requestHeadersStart(call: Call): Unit = trace { "sending request headers" }

override fun requestHeadersEnd(call: Call, request: Request) {
if (request.body == null) {
Expand All @@ -149,34 +151,34 @@ internal class HttpEngineEventListener(
trace { "finished sending request headers" }
}

override fun requestBodyStart(call: Call) = trace { "sending request body" }
override fun requestBodyStart(call: Call): Unit = trace { "sending request body" }

override fun requestBodyEnd(call: Call, byteCount: Long) {
requestTimeEnd = TimeSource.Monotonic.markNow()
trace { "finished sending request body: bytesSent=$byteCount" }
}

override fun requestFailed(call: Call, ioe: IOException) = trace(ioe) { "request failed" }
override fun requestFailed(call: Call, ioe: IOException): Unit = trace(ioe) { "request failed" }

override fun responseHeadersStart(call: Call) {
requestTimeEnd?.elapsedNow()?.let { ttfb ->
metrics.timeToFirstByteDuration.recordSeconds(ttfb)
call.request().tag<SdkRequestTag>()?.execContext?.set(EngineAttributes.TimeToFirstByte, ttfb)
call.request().tag(SdkRequestTag::class.java)?.execContext?.set(EngineAttributes.TimeToFirstByte, ttfb)
}
trace { "response headers start" }
}

override fun responseHeadersEnd(call: Call, response: Response) {
val contentLength = response.body.contentLength()
val contentLength = response.body?.contentLength()
trace { "response headers end: contentLengthHeader=$contentLength" }
}

override fun responseBodyStart(call: Call) = trace { "response body available" }
override fun responseBodyStart(call: Call): Unit = trace { "response body available" }

override fun responseBodyEnd(call: Call, byteCount: Long) =
override fun responseBodyEnd(call: Call, byteCount: Long): Unit =
trace { "response body finished: bytesConsumed=$byteCount" }

override fun responseFailed(call: Call, ioe: IOException) = trace(ioe) { "response failed" }
override fun responseFailed(call: Call, ioe: IOException): Unit = trace(ioe) { "response failed" }

override fun connectionReleased(call: Call, connection: Connection) {
metrics.acquiredConnections = pool.connectionCount().toLong()
Expand All @@ -201,16 +203,16 @@ internal class HttpEngineEventListener(
traceSpan.close()
}

override fun canceled(call: Call) = trace { "call cancelled" }
override fun canceled(call: Call): Unit = trace { "call cancelled" }

// NOTE: we don't configure a cache and should never get the rest of these events,
// seeing these messages logged means we configured something wrong

override fun satisfactionFailure(call: Call, response: Response) = trace { "cache satisfaction failure" }
override fun satisfactionFailure(call: Call, response: Response): Unit = trace { "cache satisfaction failure" }

override fun cacheConditionalHit(call: Call, cachedResponse: Response) = trace { "cache conditional hit" }
override fun cacheConditionalHit(call: Call, cachedResponse: Response): Unit = trace { "cache conditional hit" }

override fun cacheHit(call: Call, response: Response) = trace { "cache hit" }
override fun cacheHit(call: Call, response: Response): Unit = trace { "cache hit" }

override fun cacheMiss(call: Call) = trace { "cache miss" }
override fun cacheMiss(call: Call): Unit = trace { "cache miss" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.collections.Attributes
import aws.smithy.kotlin.runtime.collections.attributesOf
import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter
Expand All @@ -13,10 +14,11 @@ import okio.*
/**
* Instrument the HTTP throughput metrics (e.g. bytes rcvd/sent)
*/
internal object MetricsInterceptor : Interceptor {
@InternalApi
public object MetricsInterceptor : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val originalRequest = chain.request()
val metrics = originalRequest.tag<SdkRequestTag>()?.metrics ?: return chain.proceed(originalRequest)
val metrics = originalRequest.tag(SdkRequestTag::class.java)?.metrics ?: return chain.proceed(originalRequest)

val attrs = attributesOf { "server.address" to "${originalRequest.url.host}:${originalRequest.url.port}" }
val request = if (originalRequest.body != null) {
Expand All @@ -28,12 +30,12 @@ internal object MetricsInterceptor : Interceptor {
}

val originalResponse = chain.proceed(request)
val response = if (originalResponse.body.contentLength() != 0L) {
val response = if (originalResponse.body == null || originalResponse.body?.contentLength() == 0L) {
originalResponse
} else {
originalResponse.newBuilder()
.body(originalResponse.body.instrument(metrics.bytesReceived, attrs))
.build()
} else {
originalResponse
}

return response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.http.HttpCall
import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.*
Expand Down Expand Up @@ -64,7 +65,7 @@ public class OkHttpEngine(
// else). In both cases we need to ensure that the engine-side resources are cleaned up completely
// since they wouldn't otherwise be. https://github.com/smithy-lang/smithy-kotlin/issues/1061
if (cause != null) call.cancelInFlight()
engineResponse.body.close()
engineResponse.body?.close()
}
}
}
Expand All @@ -79,7 +80,8 @@ public class OkHttpEngine(
/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
@InternalApi
public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
val config = this

return OkHttpClient.Builder().apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.http.Headers as SdkHeaders
import okhttp3.Headers as OkHttpHeaders

/**
* Proxy [okhttp3.Headers] as [aws.smithy.kotlin.runtime.http.Headers]
*/
internal class OkHttpHeadersAdapter(private val headers: OkHttpHeaders) : SdkHeaders {
@InternalApi
public class OkHttpHeadersAdapter(private val headers: OkHttpHeaders) : SdkHeaders {
override val caseInsensitiveName: Boolean = true

override fun getAll(name: String): List<String>? =
Expand Down
Loading

0 comments on commit cbf98f7

Please sign in to comment.