Skip to content

Commit

Permalink
Add timeout enforcement to ProtocolClient (#276)
Browse files Browse the repository at this point in the history
This moves the timeout enforcement out of the HTTPClientInterface and
into the ProtocolClientInterface. This is more appropriate so that the
protocol implementations can be aware of the timeout and add a timeout
header, to propagate the deadline to the server.
  • Loading branch information
jhump authored May 29, 2024
1 parent 457b0ab commit 76b89e7
Show file tree
Hide file tree
Showing 19 changed files with 442 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaLiteServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaLiteServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
9 changes: 1 addition & 8 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1 @@
# We currently rely on OkHttp's "call timeout" to handle
# RPC deadlines, but that is not enforced when the request
# body is duplex. So timeouts don't currently work with
# bidi streams.
Timeouts/HTTPVersion:2/**/bidi-stream/**

# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
3 changes: 1 addition & 2 deletions conformance/client/known-failing-unary-cases.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import com.connectrpc.conformance.client.adapt.Invoker
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.client.adapt.UnaryClient
import com.connectrpc.http.Cancelable
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.Timeout
import com.connectrpc.impl.ProtocolClient
import com.connectrpc.okhttp.ConnectOkHttpClient
import com.connectrpc.protocols.GETConfiguration
Expand All @@ -57,6 +59,8 @@ import java.security.spec.PKCS8EncodedKeySpec
import java.time.Duration
import java.util.Base64
import kotlin.reflect.cast
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* The conformance client. This contains the logic for invoking an
Expand Down Expand Up @@ -441,9 +445,6 @@ class Client(
val certs = certs(req)
clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager)
}
if (req.timeoutMs != 0) {
clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong()))
}
// TODO: need to support max receive bytes and use req.receiveLimitBytes
val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled
val requestCompression =
Expand All @@ -458,11 +459,25 @@ class Client(
} else {
emptyList()
}
val httpClient = clientBuilder.build()
val httpClient = ConnectOkHttpClient.configureClient(clientBuilder).build()
var connectHttpClient: HTTPClientInterface = ConnectOkHttpClient(httpClient)
args.verbose.withPrefix("http client interface: ").verbosity(3) {
connectHttpClient = TracingHTTPClient(connectHttpClient, this)
}
var timeoutScheduler = Timeout.DEFAULT_SCHEDULER
args.verbose.verbosity(3) {
val verbosePrinter = this
timeoutScheduler = object : Timeout.Scheduler {
override fun scheduleTimeout(delay: kotlin.time.Duration, action: Cancelable): Timeout {
verbosePrinter.println("Scheduling timeout in $delay...")
val timeout = Timeout.DEFAULT_SCHEDULER.scheduleTimeout(delay) {
verbosePrinter.println("Timeout elapsed! Cancelling...")
action()
}
return timeout
}
}
}
return Pair(
httpClient,
ProtocolClient(
Expand All @@ -474,6 +489,14 @@ class Client(
getConfiguration = getConfig,
requestCompression = requestCompression,
compressionPools = compressionPools,
timeoutScheduler = timeoutScheduler,
timeoutOracle = {
if (req.timeoutMs == 0) {
null
} else {
req.timeoutMs.toDuration(DurationUnit.MILLISECONDS)
}
},
),
),
)
Expand Down
35 changes: 35 additions & 0 deletions library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,42 @@ package com.connectrpc

import com.connectrpc.compression.CompressionPool
import com.connectrpc.compression.GzipCompressionPool
import com.connectrpc.http.Timeout
import com.connectrpc.protocols.ConnectInterceptor
import com.connectrpc.protocols.GETConfiguration
import com.connectrpc.protocols.GRPCInterceptor
import com.connectrpc.protocols.GRPCWebInterceptor
import com.connectrpc.protocols.NetworkProtocol
import java.net.URI
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

typealias TimeoutOracle = (MethodSpec<*, *>) -> Duration?

/**
* Returns an oracle that provides the given timeouts for unary or stream
* operations, respectively.
*/
fun simpleTimeouts(unaryTimeout: Duration?, streamTimeout: Duration?): TimeoutOracle {
return { methodSpec ->
when (methodSpec.streamType) {
StreamType.UNARY -> unaryTimeout
else -> streamTimeout
}
}
}

/**
* Set of configuration used to set up clients.
*/
class ProtocolClientConfig @JvmOverloads constructor(
// TODO: Use a block-based construction pattern instead of JvmOverloads
// so we can add new fields in the future without having to worry
// about their ordering or potentially breaking compatibility with
// already-compiled byte code.

// The host (e.g., https://connectrpc.com).
val host: String,
// The client to use for performing requests.
Expand All @@ -54,6 +78,17 @@ class ProtocolClientConfig @JvmOverloads constructor(
// blocking will automatically be dispatched using the given context,
// so the caller does not need to worry about it.
val ioCoroutineContext: CoroutineContext? = null,
// A function that is consulted to determine timeouts for each RPC. If
// the function returns null, no timeout is applied. If a non-null value
// is returned, the entire call must complete before it elapses. If the
// call is still active at the end of the timeout period, it is cancelled
// and will result in an exception with a Code.DEADLINE_EXCEEDED code.
//
// The default oracle, if not configured, returns a 10 second timeout for
// all operations.
val timeoutOracle: TimeoutOracle = { 10.toDuration(DurationUnit.SECONDS) },
// Schedules timeout actions.
val timeoutScheduler: Timeout.Scheduler = Timeout.DEFAULT_SCHEDULER,
) {
private val internalInterceptorFactoryList = mutableListOf<(ProtocolClientConfig) -> Interceptor>()
private val compressionPools = mutableMapOf<String, CompressionPool>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.connectrpc.http
import com.connectrpc.StreamResult
import okio.Buffer

/** A function that cancels an operation when called. */
typealias Cancelable = () -> Unit

/**
Expand Down
13 changes: 12 additions & 1 deletion library/src/main/kotlin/com/connectrpc/http/HTTPRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.connectrpc.Headers
import com.connectrpc.MethodSpec
import okio.Buffer
import java.net.URL
import kotlin.time.Duration

enum class HTTPMethod(
val string: String,
Expand All @@ -34,6 +35,8 @@ open class HTTPRequest internal constructor(
val url: URL,
// Value to assign to the `content-type` header.
val contentType: String,
// The optional timeout for this request.
val timeout: Duration?,
// Additional outbound headers for the request.
val headers: Headers,
// The method spec associated with the request.
Expand All @@ -51,6 +54,8 @@ fun HTTPRequest.clone(
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// The optional timeout for this request.
timeout: Duration? = this.timeout,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
Expand All @@ -59,6 +64,7 @@ fun HTTPRequest.clone(
return HTTPRequest(
url,
contentType,
timeout,
headers,
methodSpec,
)
Expand All @@ -73,6 +79,8 @@ class UnaryHTTPRequest(
url: URL,
// Value to assign to the `content-type` header.
contentType: String,
// The optional timeout for this request.
timeout: Duration?,
// Additional outbound headers for the request.
headers: Headers,
// The method spec associated with the request.
Expand All @@ -82,13 +90,15 @@ class UnaryHTTPRequest(
// HTTP method to use with the request.
// Almost always POST, but side effect free unary RPCs may be made with GET.
val httpMethod: HTTPMethod = HTTPMethod.POST,
) : HTTPRequest(url, contentType, headers, methodSpec)
) : HTTPRequest(url, contentType, timeout, headers, methodSpec)

fun UnaryHTTPRequest.clone(
// The URL for the request.
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// The optional timeout for this request.
timeout: Duration? = this.timeout,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
Expand All @@ -101,6 +111,7 @@ fun UnaryHTTPRequest.clone(
return UnaryHTTPRequest(
url,
contentType,
timeout,
headers,
methodSpec,
message,
Expand Down
86 changes: 86 additions & 0 deletions library/src/main/kotlin/com/connectrpc/http/Timeout.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022-2023 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.connectrpc.http

import kotlinx.coroutines.delay
import java.util.Timer
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.timerTask
import kotlin.time.Duration

/**
* Represents the timeout state for an RPC.
*/
class Timeout private constructor(
private val timeoutAction: Cancelable,
) {
private val done = AtomicBoolean(false)

@Volatile private var triggered: Boolean = false
private var onCancel: Cancelable? = null

/** Returns true if this timeout has lapsed and the associated RPC canceled. */
val timedOut: Boolean
get() = triggered

/**
* Cancels the timeout. Should only be called when the RPC completes before the
* timeout elapses. Returns true if the timeout was canceled or false if either
* it was already previously canceled or has already timed out. The `timedOut`
* property can be queried to distinguish between these two possibilities.
*/
fun cancel(): Boolean {
if (done.compareAndSet(false, true)) {
onCancel?.invoke()
return true
}
return false
}

private fun trigger() {
if (done.compareAndSet(false, true)) {
triggered = true
timeoutAction()
}
}

/** Schedules timeouts for RPCs. */
interface Scheduler {
/**
* Schedules a timeout that should invoke the given action to cancel
* an RPC after the given delay.
*/

fun scheduleTimeout(delay: Duration, action: Cancelable): Timeout
}

companion object {
/**
* A default implementation that a Timer backed by a single daemon thread.
* The thread isn't started until the first cancelation is scheduled.
*/
val DEFAULT_SCHEDULER = object : Scheduler {
override fun scheduleTimeout(delay: Duration, action: Cancelable): Timeout {
val timeout = Timeout(action)
val task = timerTask { timeout.trigger() }
timer.value.schedule(task, delay.inWholeMilliseconds)
timeout.onCancel = { task.cancel() }
return timeout
}
}

private val timer = lazy { Timer(Scheduler::class.qualifiedName, true) }
}
}
Loading

0 comments on commit 76b89e7

Please sign in to comment.