diff --git a/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt index 319eb8e9..0e7800ed 100644 --- a/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt +++ b/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt @@ -42,10 +42,36 @@ interface BidirectionalStreamInterface { fun close() /** - * Determine if the underlying stream is closed. + * Determine if the underlying send and receive stream is closed. * - * @return true if the underlying stream is closed. If the stream is still open, + * @return true if the underlying send and receive stream is closed. If the stream is still open, * this will return false. */ fun isClosed(): Boolean + + /** + * Close the send stream. No calls to [send] are valid after calling [sendClose]. + */ + fun sendClose() + + /** + * Close the receive stream. + */ + fun receiveClose() + + /** + * Determine if the underlying client send stream is closed. + * + * @return true if the underlying client receive stream is closed. If the stream is still open, + * this will return false. + */ + fun isSendClosed(): Boolean + + /** + * Determine if the underlying client receive stream is closed. + * + * @return true if the underlying client receive stream is closed. If the stream is still open, + * this will return false. + */ + fun isReceiveClosed(): Boolean } diff --git a/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt index 4513b786..9e0010f1 100644 --- a/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt +++ b/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt @@ -26,6 +26,13 @@ interface ClientOnlyStreamInterface { */ suspend fun send(input: Input): Result + /** + * Receive a single response and close the stream. + * + * @return the single response [StreamResult]. + */ + suspend fun receiveAndClose(): StreamResult + /** * Close the stream. No calls to [send] are valid after calling [close]. */ diff --git a/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt index be612bf9..ddd664c1 100644 --- a/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt +++ b/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt @@ -38,6 +38,17 @@ interface ServerOnlyStreamInterface { */ suspend fun send(input: Input): Result + /** + * Send a request to the server over the stream and closes the request. + * + * Can only be called exactly one time when starting the stream. + * + * @param input The request message to send. + * @return [Result.success] on send success, [Result.failure] on + * any sends which are not successful. + */ + suspend fun sendAndClose(input: Input): Result + /** * Close the stream. No calls to [send] are valid after calling [close]. */ diff --git a/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt b/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt index 796b1f44..9e3d8df4 100644 --- a/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt +++ b/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt @@ -49,9 +49,11 @@ interface HTTPClientInterface { class Stream( private val onSend: (Buffer) -> Unit, - private val onClose: () -> Unit + private val onSendClose: () -> Unit = {}, + private val onReceiveClose: () -> Unit = {} ) { - private val isClosed = AtomicReference(false) + private val isSendClosed = AtomicReference(false) + private val isReceiveClosed = AtomicReference(false) fun send(buffer: Buffer): Result { if (isClosed()) { @@ -65,13 +67,27 @@ class Stream( } } - fun close() { - if (!isClosed.getAndSet(true)) { - onClose() + fun sendClose() { + if (!isSendClosed.getAndSet(true)) { + onSendClose() + } + } + + fun receiveClose() { + if (!isReceiveClosed.getAndSet(true)) { + onReceiveClose() } } fun isClosed(): Boolean { - return isClosed.get() + return isSendClosed() && isReceiveClosed() + } + + fun isSendClosed(): Boolean { + return isSendClosed.get() + } + + fun isReceiveClosed(): Boolean { + return isReceiveClosed.get() } } diff --git a/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt b/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt index e452e11c..f60841c5 100644 --- a/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt +++ b/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt @@ -45,10 +45,26 @@ internal class BidirectionalStream( } override fun close() { - stream.close() + stream.sendClose() } override fun isClosed(): Boolean { return stream.isClosed() } + + override fun sendClose() { + stream.sendClose() + } + + override fun receiveClose() { + stream.receiveClose() + } + + override fun isSendClosed(): Boolean { + return stream.isSendClosed() + } + + override fun isReceiveClosed(): Boolean { + return stream.isReceiveClosed() + } } diff --git a/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt b/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt index 9e683c97..22489645 100644 --- a/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt +++ b/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt @@ -16,6 +16,7 @@ package build.buf.connect.impl import build.buf.connect.BidirectionalStreamInterface import build.buf.connect.ClientOnlyStreamInterface +import build.buf.connect.StreamResult /** * Concrete implementation of [ClientOnlyStreamInterface]. @@ -27,6 +28,15 @@ internal class ClientOnlyStream( return messageStream.send(input) } + override suspend fun receiveAndClose(): StreamResult { + val resultChannel = messageStream.resultChannel() + try { + return resultChannel.receive() + } finally { + resultChannel.cancel() + } + } + override fun close() { messageStream.close() } diff --git a/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt b/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt index de857d7e..fd66e091 100644 --- a/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt @@ -192,18 +192,27 @@ class ProtocolClient( channel.send(result) } continuation.invokeOnCancellation { - httpStream.close() + httpStream.sendClose() + httpStream.receiveClose() + } + val stream = Stream( + onSend = { buffer -> + httpStream.send(streamFunc.requestBodyFunction(buffer)) + }, + onReceiveClose = { + httpStream.receiveClose() + }, + onSendClose = { + httpStream.sendClose() + } + ) + channel.invokeOnClose { + // Receive channel is closed so the stream's receive will be closed. + stream.receiveClose() } continuation.resume( BidirectionalStream( - Stream( - onSend = { buffer -> - httpStream.send(streamFunc.requestBodyFunction(buffer)) - }, - onClose = { - httpStream.close() - } - ), + stream, requestCodec, channel ) diff --git a/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt b/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt index 92538ff4..4199d0e8 100644 --- a/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt +++ b/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt @@ -29,6 +29,14 @@ internal class ServerOnlyStream( return messageStream.resultChannel() } + override suspend fun sendAndClose(input: Input): Result { + return try { + messageStream.send(input) + } finally { + messageStream.close() + } + } + override suspend fun send(input: Input): Result { return messageStream.send(input) } diff --git a/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt b/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt index 08eb8b62..b786d7b1 100644 --- a/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt +++ b/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt @@ -48,7 +48,8 @@ internal fun OkHttpClient.initializeStream( request: HTTPRequest, onResult: suspend (StreamResult) -> Unit ): Stream { - val isClosed = AtomicBoolean(false) + val isSendClosed = AtomicBoolean(false) + val isReceiveClosed = AtomicBoolean(false) val duplexRequestBody = PipeDuplexRequestBody(request.contentType.toMediaType()) val builder = Request.Builder() .url(request.url) @@ -60,23 +61,21 @@ internal fun OkHttpClient.initializeStream( } val callRequest = builder.build() val call = newCall(callRequest) - call.enqueue(ResponseCallback(onResult, isClosed)) + call.enqueue(ResponseCallback(onResult, isSendClosed)) return Stream( onSend = { buffer -> - if (!isClosed.get()) { + if (!isSendClosed.get()) { duplexRequestBody.forConsume(buffer) } }, - onClose = { - try { - isClosed.set(true) - call.cancel() - duplexRequestBody.close() - } catch (_: Throwable) { - // No-op - } + onSendClose = { + isSendClosed.set(true) + duplexRequestBody.close() } - ) + ) { + isReceiveClosed.set(true) + call.cancel() + } } private class ResponseCallback(