Skip to content

Commit

Permalink
add support for accessing headers and trailers using CompletableDefer…
Browse files Browse the repository at this point in the history
…red, which completes when the headers or trailers are received
  • Loading branch information
jhump committed Dec 7, 2023
1 parent de27a21 commit 70ec37c
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -27,6 +28,25 @@ interface BidirectionalStreamInterface<Input, Output> {
*/
fun responseChannel(): ReceiveChannel<Output>

/**
* The response headers. This value will become available before any output
* messages become available from the [responseChannel] and before trailers
* are available from [responseTrailers]. If the stream fails before headers
* are ever received, this will complete with an empty value. The
* [ReceiveChannel.receive] method of [responseChannel] can be used to
* recover the exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [ReceiveChannel.receive]
* method of [responseChannel] can be used to recover the exception that caused
* such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Send a request to the server over the stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.connectrpc

import kotlinx.coroutines.Deferred

/**
* Represents a client-only stream (a stream where the client streams data to the server and
* eventually receives a response) that can send request messages and initiate closes.
Expand All @@ -34,6 +36,24 @@ interface ClientOnlyStreamInterface<Input, Output> {
*/
suspend fun receiveAndClose(): Output

/**
* The response headers. This value will become available before any call to
* [receiveAndClose] completes and before trailers are available from
* [responseTrailers] (though these may occur nearly simultaneously). If the
* stream fails before headers are ever received, this will complete with an
* empty value. The [receiveAndClose] method can be used to recover the
* exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [receiveAndClose]
* method can be used to recover the exception that caused such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Close the stream. No calls to [send] are valid after calling [sendClose].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -28,6 +29,25 @@ interface ServerOnlyStreamInterface<Input, Output> {
*/
fun responseChannel(): ReceiveChannel<Output>

/**
* The response headers. This value will become available before any output
* messages become available from the [responseChannel] and before trailers
* are available from [responseTrailers]. If the stream fails before headers
* are ever received, this will complete with an empty value. The
* [ReceiveChannel.receive] method of [responseChannel] can be used to
* recover the exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [ReceiveChannel.receive]
* method of [responseChannel] can be used to recover the exception that caused
* such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Send a request to the server over the stream and closes the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Stream(
private val isReceiveClosed = AtomicReference(false)

fun send(buffer: Buffer): Result<Unit> {
if (isClosed()) {
if (isSendClosed()) {
return Result.failure(IllegalStateException("cannot send. underlying stream is closed"))
}
return try {
Expand All @@ -76,11 +76,14 @@ class Stream(
fun receiveClose() {
if (!isReceiveClosed.getAndSet(true)) {
onReceiveClose()
// closing receive side implicitly closes send side, too
isSendClosed.set(true)
}
}

// TODO: remove this method as it is redundant with receive closed
fun isClosed(): Boolean {
return isSendClosed() && isReceiveClosed()
return isReceiveClosed()
}

fun isSendClosed(): Boolean {
Expand Down
16 changes: 14 additions & 2 deletions library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package com.connectrpc.impl

import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.Codec
import com.connectrpc.Headers
import com.connectrpc.http.Stream
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import java.lang.Exception
Expand All @@ -27,7 +29,9 @@ import java.lang.Exception
internal class BidirectionalStream<Input, Output>(
val stream: Stream,
private val requestCodec: Codec<Input>,
private val receiveChannel: Channel<Output>,
private val responseChannel: Channel<Output>,
private val responseHeaders: Deferred<Headers>,
private val responseTrailers: Deferred<Headers>
) : BidirectionalStreamInterface<Input, Output> {

override suspend fun send(input: Input): Result<Unit> {
Expand All @@ -40,7 +44,15 @@ internal class BidirectionalStream<Input, Output>(
}

override fun responseChannel(): ReceiveChannel<Output> {
return receiveChannel
return responseChannel
}

override fun responseHeaders(): Deferred<Headers> {
return responseHeaders
}

override fun responseTrailers(): Deferred<Headers> {
return responseTrailers
}

override fun isClosed(): Boolean {
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.ClientOnlyStreamInterface
import com.connectrpc.Code
import com.connectrpc.ConnectException
import com.connectrpc.Headers
import kotlinx.coroutines.Deferred

/**
* Concrete implementation of [ClientOnlyStreamInterface].
Expand All @@ -44,6 +46,14 @@ internal class ClientOnlyStream<Input, Output>(
}
}

override fun responseHeaders(): Deferred<Headers> {
return messageStream.responseHeaders()
}

override fun responseTrailers(): Deferred<Headers> {
return messageStream.responseTrailers()
}

override fun sendClose() {
return messageStream.sendClose()
}
Expand Down
37 changes: 30 additions & 7 deletions library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.HTTPRequest
import com.connectrpc.http.Stream
import com.connectrpc.protocols.GETConfiguration
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.suspendCancellableCoroutine
import java.net.URL
Expand Down Expand Up @@ -178,6 +179,8 @@ class ProtocolClient(
headers: Headers,
): BidirectionalStream<Input, Output> = suspendCancellableCoroutine { continuation ->
val channel = Channel<Output>(1)
val responseHeaders = CompletableDeferred<Headers>()
val responseTrailers = CompletableDeferred<Headers>()
val requestCodec = config.serializationStrategy.codec(methodSpec.requestClass)
val responseCodec = config.serializationStrategy.codec(methodSpec.responseClass)
val request = HTTPRequest(
Expand All @@ -197,32 +200,51 @@ class ProtocolClient(
// Pass through the interceptor chain.
when (val streamResult = streamFunc.streamResultFunction(initialResult)) {
is StreamResult.Headers -> {
// Not currently used except for interceptors.
// If this is incorrectly called 2x, only the first result is used.
// Subsequent calls to complete will be ignored.
responseHeaders.complete(streamResult.headers)
}

is StreamResult.Message -> {
// Just in case protocol impl failed to provide StreamResult.Headers,
// treat headers as empty. This is a no-op if we did correctly receive
// them already.
responseHeaders.complete(emptyMap())
try {
val message = responseCodec.deserialize(
streamResult.message,
)
channel.send(message)
} catch (e: Throwable) {
// TODO: setting isComplete, responseTrailers, and RPC status
// here seems wrong. What would prevent the call to
// channel.send such that we don't bother getting the
// actual result/trailers from the server?
isComplete = true
channel.close(ConnectException(Code.UNKNOWN, exception = e))
try {
channel.close(ConnectException(Code.UNKNOWN, exception = e))
} finally {
responseTrailers.complete(emptyMap())
}
}
}

is StreamResult.Complete -> {
// This is a no-op if we already received a StreamResult.Headers.
responseHeaders.complete(emptyMap())
isComplete = true
when (streamResult.code) {
Code.OK -> channel.close()
else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code))
try {
when (streamResult.code) {
Code.OK -> channel.close()
else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code))
}
} finally {
responseTrailers.complete(streamResult.trailers)
}
}
}
}
continuation.invokeOnCancellation {
httpStream.sendClose()
httpStream.receiveClose()
}
val stream = Stream(
Expand All @@ -237,14 +259,15 @@ class ProtocolClient(
},
)
channel.invokeOnClose {
// Receive channel is closed so the stream's receive will be closed.
stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
stream,
requestCodec,
channel,
responseHeaders,
responseTrailers
),
)
}
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.connectrpc.impl

import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.Headers
import com.connectrpc.ServerOnlyStreamInterface
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -28,6 +30,14 @@ internal class ServerOnlyStream<Input, Output>(
return messageStream.responseChannel()
}

override fun responseHeaders(): Deferred<Headers> {
return messageStream.responseHeaders()
}

override fun responseTrailers(): Deferred<Headers> {
return messageStream.responseTrailers()
}

override suspend fun sendAndClose(input: Input): Result<Unit> {
try {
return messageStream.send(input)
Expand Down
2 changes: 2 additions & 0 deletions okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ internal fun OkHttpClient.initializeStream(
onReceiveClose = {
isReceiveClosed.set(true)
call.cancel()
// cancelling implicitly closes send-side, too
isSendClosed.set(true)
},
)
}
Expand Down

0 comments on commit 70ec37c

Please sign in to comment.