diff --git a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt index 803c4ecf..87778622 100644 --- a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt @@ -14,6 +14,7 @@ package com.connectrpc +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -27,6 +28,25 @@ interface BidirectionalStreamInterface { */ fun responseChannel(): ReceiveChannel + /** + * The response headers. This value will become available before any output + * messages become available from the [responseChannel] and before trailers + * are available from [responseTrailers]. If the stream fails before headers + * are ever received, this will complete with an empty value. The + * [ReceiveChannel.receive] method of [responseChannel] can be used to + * recover the exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [ReceiveChannel.receive] + * method of [responseChannel] can be used to recover the exception that caused + * such a failure. + */ + fun responseTrailers(): Deferred + /** * Send a request to the server over the stream. * diff --git a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt index 1927fe44..be625bf4 100644 --- a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt @@ -14,6 +14,8 @@ package com.connectrpc +import kotlinx.coroutines.Deferred + /** * Represents a client-only stream (a stream where the client streams data to the server and * eventually receives a response) that can send request messages and initiate closes. @@ -34,6 +36,24 @@ interface ClientOnlyStreamInterface { */ suspend fun receiveAndClose(): Output + /** + * The response headers. This value will become available before any call to + * [receiveAndClose] completes and before trailers are available from + * [responseTrailers] (though these may occur nearly simultaneously). If the + * stream fails before headers are ever received, this will complete with an + * empty value. The [receiveAndClose] method can be used to recover the + * exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [receiveAndClose] + * method can be used to recover the exception that caused such a failure. + */ + fun responseTrailers(): Deferred + /** * Close the stream. No calls to [send] are valid after calling [sendClose]. */ diff --git a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt index 6783ee9a..f7adbfae 100644 --- a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt @@ -14,6 +14,7 @@ package com.connectrpc +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -28,6 +29,25 @@ interface ServerOnlyStreamInterface { */ fun responseChannel(): ReceiveChannel + /** + * The response headers. This value will become available before any output + * messages become available from the [responseChannel] and before trailers + * are available from [responseTrailers]. If the stream fails before headers + * are ever received, this will complete with an empty value. The + * [ReceiveChannel.receive] method of [responseChannel] can be used to + * recover the exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [ReceiveChannel.receive] + * method of [responseChannel] can be used to recover the exception that caused + * such a failure. + */ + fun responseTrailers(): Deferred + /** * Send a request to the server over the stream and closes the request. * diff --git a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt index 9d5e92d3..ccad26b1 100644 --- a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt @@ -56,7 +56,7 @@ class Stream( private val isReceiveClosed = AtomicReference(false) fun send(buffer: Buffer): Result { - if (isClosed()) { + if (isSendClosed()) { return Result.failure(IllegalStateException("cannot send. underlying stream is closed")) } return try { @@ -76,11 +76,14 @@ class Stream( fun receiveClose() { if (!isReceiveClosed.getAndSet(true)) { onReceiveClose() + // closing receive side implicitly closes send side, too + isSendClosed.set(true) } } + // TODO: remove this method as it is redundant with receive closed fun isClosed(): Boolean { - return isSendClosed() && isReceiveClosed() + return isReceiveClosed() } fun isSendClosed(): Boolean { diff --git a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt index 5048b09b..9b0006e7 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt @@ -16,7 +16,9 @@ package com.connectrpc.impl import com.connectrpc.BidirectionalStreamInterface import com.connectrpc.Codec +import com.connectrpc.Headers import com.connectrpc.http.Stream +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import java.lang.Exception @@ -27,7 +29,9 @@ import java.lang.Exception internal class BidirectionalStream( val stream: Stream, private val requestCodec: Codec, - private val receiveChannel: Channel, + private val responseChannel: Channel, + private val responseHeaders: Deferred, + private val responseTrailers: Deferred, ) : BidirectionalStreamInterface { override suspend fun send(input: Input): Result { @@ -40,7 +44,15 @@ internal class BidirectionalStream( } override fun responseChannel(): ReceiveChannel { - return receiveChannel + return responseChannel + } + + override fun responseHeaders(): Deferred { + return responseHeaders + } + + override fun responseTrailers(): Deferred { + return responseTrailers } override fun isClosed(): Boolean { diff --git a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt index b1e9f052..a712a6e4 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt @@ -18,6 +18,8 @@ import com.connectrpc.BidirectionalStreamInterface import com.connectrpc.ClientOnlyStreamInterface import com.connectrpc.Code import com.connectrpc.ConnectException +import com.connectrpc.Headers +import kotlinx.coroutines.Deferred /** * Concrete implementation of [ClientOnlyStreamInterface]. @@ -44,6 +46,14 @@ internal class ClientOnlyStream( } } + override fun responseHeaders(): Deferred { + return messageStream.responseHeaders() + } + + override fun responseTrailers(): Deferred { + return messageStream.responseTrailers() + } + override fun sendClose() { return messageStream.sendClose() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index 79aa94bc..6dcae044 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -31,6 +31,7 @@ import com.connectrpc.http.HTTPClientInterface import com.connectrpc.http.HTTPRequest import com.connectrpc.http.Stream import com.connectrpc.protocols.GETConfiguration +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.suspendCancellableCoroutine import java.net.URL @@ -178,6 +179,8 @@ class ProtocolClient( headers: Headers, ): BidirectionalStream = suspendCancellableCoroutine { continuation -> val channel = Channel(1) + val responseHeaders = CompletableDeferred() + val responseTrailers = CompletableDeferred() val requestCodec = config.serializationStrategy.codec(methodSpec.requestClass) val responseCodec = config.serializationStrategy.codec(methodSpec.responseClass) val request = HTTPRequest( @@ -197,32 +200,51 @@ class ProtocolClient( // Pass through the interceptor chain. when (val streamResult = streamFunc.streamResultFunction(initialResult)) { is StreamResult.Headers -> { - // Not currently used except for interceptors. + // If this is incorrectly called 2x, only the first result is used. + // Subsequent calls to complete will be ignored. + responseHeaders.complete(streamResult.headers) } is StreamResult.Message -> { + // Just in case protocol impl failed to provide StreamResult.Headers, + // treat headers as empty. This is a no-op if we did correctly receive + // them already. + responseHeaders.complete(emptyMap()) try { val message = responseCodec.deserialize( streamResult.message, ) channel.send(message) } catch (e: Throwable) { + // TODO: setting isComplete, responseTrailers, and RPC status + // here seems wrong. What would prevent the call to + // channel.send such that we don't bother getting the + // actual result/trailers from the server? isComplete = true - channel.close(ConnectException(Code.UNKNOWN, exception = e)) + try { + channel.close(ConnectException(Code.UNKNOWN, exception = e)) + } finally { + responseTrailers.complete(emptyMap()) + } } } is StreamResult.Complete -> { + // This is a no-op if we already received a StreamResult.Headers. + responseHeaders.complete(emptyMap()) isComplete = true - when (streamResult.code) { - Code.OK -> channel.close() - else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code)) + try { + when (streamResult.code) { + Code.OK -> channel.close() + else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code)) + } + } finally { + responseTrailers.complete(streamResult.trailers) } } } } continuation.invokeOnCancellation { - httpStream.sendClose() httpStream.receiveClose() } val stream = Stream( @@ -237,7 +259,6 @@ class ProtocolClient( }, ) channel.invokeOnClose { - // Receive channel is closed so the stream's receive will be closed. stream.receiveClose() } continuation.resume( @@ -245,6 +266,8 @@ class ProtocolClient( stream, requestCodec, channel, + responseHeaders, + responseTrailers, ), ) } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt index b2d01bbd..d1fa9b3e 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt @@ -15,7 +15,9 @@ package com.connectrpc.impl import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers import com.connectrpc.ServerOnlyStreamInterface +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -28,6 +30,14 @@ internal class ServerOnlyStream( return messageStream.responseChannel() } + override fun responseHeaders(): Deferred { + return messageStream.responseHeaders() + } + + override fun responseTrailers(): Deferred { + return messageStream.responseTrailers() + } + override suspend fun sendAndClose(input: Input): Result { try { return messageStream.send(input) diff --git a/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt b/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt index e1fbbfcf..ef1bdb24 100644 --- a/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt +++ b/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt @@ -76,6 +76,8 @@ internal fun OkHttpClient.initializeStream( onReceiveClose = { isReceiveClosed.set(true) call.cancel() + // cancelling implicitly closes send-side, too + isSendClosed.set(true) }, ) }