diff --git a/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt
index 319eb8e9..0e7800ed 100644
--- a/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt
+++ b/library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt
@@ -42,10 +42,36 @@ interface BidirectionalStreamInterface {
fun close()
/**
- * Determine if the underlying stream is closed.
+ * Determine if the underlying send and receive stream is closed.
*
- * @return true if the underlying stream is closed. If the stream is still open,
+ * @return true if the underlying send and receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isClosed(): Boolean
+
+ /**
+ * Close the send stream. No calls to [send] are valid after calling [sendClose].
+ */
+ fun sendClose()
+
+ /**
+ * Close the receive stream.
+ */
+ fun receiveClose()
+
+ /**
+ * Determine if the underlying client send stream is closed.
+ *
+ * @return true if the underlying client receive stream is closed. If the stream is still open,
+ * this will return false.
+ */
+ fun isSendClosed(): Boolean
+
+ /**
+ * Determine if the underlying client receive stream is closed.
+ *
+ * @return true if the underlying client receive stream is closed. If the stream is still open,
+ * this will return false.
+ */
+ fun isReceiveClosed(): Boolean
}
diff --git a/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt
index 4513b786..9e0010f1 100644
--- a/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt
+++ b/library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt
@@ -26,6 +26,13 @@ interface ClientOnlyStreamInterface {
*/
suspend fun send(input: Input): Result
+ /**
+ * Receive a single response and close the stream.
+ *
+ * @return the single response [StreamResult].
+ */
+ suspend fun receiveAndClose(): StreamResult
+
/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
diff --git a/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt b/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt
index be612bf9..ddd664c1 100644
--- a/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt
+++ b/library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt
@@ -38,6 +38,17 @@ interface ServerOnlyStreamInterface {
*/
suspend fun send(input: Input): Result
+ /**
+ * Send a request to the server over the stream and closes the request.
+ *
+ * Can only be called exactly one time when starting the stream.
+ *
+ * @param input The request message to send.
+ * @return [Result.success] on send success, [Result.failure] on
+ * any sends which are not successful.
+ */
+ suspend fun sendAndClose(input: Input): Result
+
/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
diff --git a/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt b/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt
index 796b1f44..9e3d8df4 100644
--- a/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt
+++ b/library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt
@@ -49,9 +49,11 @@ interface HTTPClientInterface {
class Stream(
private val onSend: (Buffer) -> Unit,
- private val onClose: () -> Unit
+ private val onSendClose: () -> Unit = {},
+ private val onReceiveClose: () -> Unit = {}
) {
- private val isClosed = AtomicReference(false)
+ private val isSendClosed = AtomicReference(false)
+ private val isReceiveClosed = AtomicReference(false)
fun send(buffer: Buffer): Result {
if (isClosed()) {
@@ -65,13 +67,27 @@ class Stream(
}
}
- fun close() {
- if (!isClosed.getAndSet(true)) {
- onClose()
+ fun sendClose() {
+ if (!isSendClosed.getAndSet(true)) {
+ onSendClose()
+ }
+ }
+
+ fun receiveClose() {
+ if (!isReceiveClosed.getAndSet(true)) {
+ onReceiveClose()
}
}
fun isClosed(): Boolean {
- return isClosed.get()
+ return isSendClosed() && isReceiveClosed()
+ }
+
+ fun isSendClosed(): Boolean {
+ return isSendClosed.get()
+ }
+
+ fun isReceiveClosed(): Boolean {
+ return isReceiveClosed.get()
}
}
diff --git a/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt b/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt
index e452e11c..f60841c5 100644
--- a/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt
+++ b/library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt
@@ -45,10 +45,26 @@ internal class BidirectionalStream (
}
override fun close() {
- stream.close()
+ stream.sendClose()
}
override fun isClosed(): Boolean {
return stream.isClosed()
}
+
+ override fun sendClose() {
+ stream.sendClose()
+ }
+
+ override fun receiveClose() {
+ stream.receiveClose()
+ }
+
+ override fun isSendClosed(): Boolean {
+ return stream.isSendClosed()
+ }
+
+ override fun isReceiveClosed(): Boolean {
+ return stream.isReceiveClosed()
+ }
}
diff --git a/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt b/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
index 9e683c97..22489645 100644
--- a/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
+++ b/library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
@@ -16,6 +16,7 @@ package build.buf.connect.impl
import build.buf.connect.BidirectionalStreamInterface
import build.buf.connect.ClientOnlyStreamInterface
+import build.buf.connect.StreamResult
/**
* Concrete implementation of [ClientOnlyStreamInterface].
@@ -27,6 +28,15 @@ internal class ClientOnlyStream (
return messageStream.send(input)
}
+ override suspend fun receiveAndClose(): StreamResult {
+ val resultChannel = messageStream.resultChannel()
+ try {
+ return resultChannel.receive()
+ } finally {
+ resultChannel.cancel()
+ }
+ }
+
override fun close() {
messageStream.close()
}
diff --git a/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt b/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
index de857d7e..fd66e091 100644
--- a/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
+++ b/library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
@@ -192,18 +192,27 @@ class ProtocolClient(
channel.send(result)
}
continuation.invokeOnCancellation {
- httpStream.close()
+ httpStream.sendClose()
+ httpStream.receiveClose()
+ }
+ val stream = Stream(
+ onSend = { buffer ->
+ httpStream.send(streamFunc.requestBodyFunction(buffer))
+ },
+ onReceiveClose = {
+ httpStream.receiveClose()
+ },
+ onSendClose = {
+ httpStream.sendClose()
+ }
+ )
+ channel.invokeOnClose {
+ // Receive channel is closed so the stream's receive will be closed.
+ stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
- Stream(
- onSend = { buffer ->
- httpStream.send(streamFunc.requestBodyFunction(buffer))
- },
- onClose = {
- httpStream.close()
- }
- ),
+ stream,
requestCodec,
channel
)
diff --git a/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt b/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt
index 92538ff4..4199d0e8 100644
--- a/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt
+++ b/library/src/main/kotlin/build/buf/connect/impl/ServerOnlyStream.kt
@@ -29,6 +29,14 @@ internal class ServerOnlyStream (
return messageStream.resultChannel()
}
+ override suspend fun sendAndClose(input: Input): Result {
+ return try {
+ messageStream.send(input)
+ } finally {
+ messageStream.close()
+ }
+ }
+
override suspend fun send(input: Input): Result {
return messageStream.send(input)
}
diff --git a/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt b/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
index 08eb8b62..b786d7b1 100644
--- a/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
+++ b/okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
@@ -48,7 +48,8 @@ internal fun OkHttpClient.initializeStream(
request: HTTPRequest,
onResult: suspend (StreamResult) -> Unit
): Stream {
- val isClosed = AtomicBoolean(false)
+ val isSendClosed = AtomicBoolean(false)
+ val isReceiveClosed = AtomicBoolean(false)
val duplexRequestBody = PipeDuplexRequestBody(request.contentType.toMediaType())
val builder = Request.Builder()
.url(request.url)
@@ -60,23 +61,21 @@ internal fun OkHttpClient.initializeStream(
}
val callRequest = builder.build()
val call = newCall(callRequest)
- call.enqueue(ResponseCallback(onResult, isClosed))
+ call.enqueue(ResponseCallback(onResult, isSendClosed))
return Stream(
onSend = { buffer ->
- if (!isClosed.get()) {
+ if (!isSendClosed.get()) {
duplexRequestBody.forConsume(buffer)
}
},
- onClose = {
- try {
- isClosed.set(true)
- call.cancel()
- duplexRequestBody.close()
- } catch (_: Throwable) {
- // No-op
- }
+ onSendClose = {
+ isSendClosed.set(true)
+ duplexRequestBody.close()
}
- )
+ ) {
+ isReceiveClosed.set(true)
+ call.cancel()
+ }
}
private class ResponseCallback(