Skip to content

Commit

Permalink
Split stream close functionality (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
buildbreaker authored Aug 11, 2023
1 parent e78b96b commit a16ff78
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,36 @@ interface BidirectionalStreamInterface<Input, Output> {
fun close()

/**
* Determine if the underlying stream is closed.
* Determine if the underlying send and receive stream is closed.
*
* @return true if the underlying stream is closed. If the stream is still open,
* @return true if the underlying send and receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isClosed(): Boolean

/**
* Close the send stream. No calls to [send] are valid after calling [sendClose].
*/
fun sendClose()

/**
* Close the receive stream.
*/
fun receiveClose()

/**
* Determine if the underlying client send stream is closed.
*
* @return true if the underlying client receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isSendClosed(): Boolean

/**
* Determine if the underlying client receive stream is closed.
*
* @return true if the underlying client receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isReceiveClosed(): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ interface ClientOnlyStreamInterface<Input, Output> {
*/
suspend fun send(input: Input): Result<Unit>

/**
* Receive a single response and close the stream.
*
* @return the single response [StreamResult].
*/
suspend fun receiveAndClose(): StreamResult<Output>

/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ interface ServerOnlyStreamInterface<Input, Output> {
*/
suspend fun send(input: Input): Result<Unit>

/**
* Send a request to the server over the stream and closes the request.
*
* Can only be called exactly one time when starting the stream.
*
* @param input The request message to send.
* @return [Result.success] on send success, [Result.failure] on
* any sends which are not successful.
*/
suspend fun sendAndClose(input: Input): Result<Unit>

/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ interface HTTPClientInterface {

class Stream(
private val onSend: (Buffer) -> Unit,
private val onClose: () -> Unit
private val onSendClose: () -> Unit = {},
private val onReceiveClose: () -> Unit = {}
) {
private val isClosed = AtomicReference(false)
private val isSendClosed = AtomicReference(false)
private val isReceiveClosed = AtomicReference(false)

fun send(buffer: Buffer): Result<Unit> {
if (isClosed()) {
Expand All @@ -65,13 +67,27 @@ class Stream(
}
}

fun close() {
if (!isClosed.getAndSet(true)) {
onClose()
fun sendClose() {
if (!isSendClosed.getAndSet(true)) {
onSendClose()
}
}

fun receiveClose() {
if (!isReceiveClosed.getAndSet(true)) {
onReceiveClose()
}
}

fun isClosed(): Boolean {
return isClosed.get()
return isSendClosed() && isReceiveClosed()
}

fun isSendClosed(): Boolean {
return isSendClosed.get()
}

fun isReceiveClosed(): Boolean {
return isReceiveClosed.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,26 @@ internal class BidirectionalStream<Input, Output>(
}

override fun close() {
stream.close()
stream.sendClose()
}

override fun isClosed(): Boolean {
return stream.isClosed()
}

override fun sendClose() {
stream.sendClose()
}

override fun receiveClose() {
stream.receiveClose()
}

override fun isSendClosed(): Boolean {
return stream.isSendClosed()
}

override fun isReceiveClosed(): Boolean {
return stream.isReceiveClosed()
}
}
10 changes: 10 additions & 0 deletions library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package build.buf.connect.impl

import build.buf.connect.BidirectionalStreamInterface
import build.buf.connect.ClientOnlyStreamInterface
import build.buf.connect.StreamResult

/**
* Concrete implementation of [ClientOnlyStreamInterface].
Expand All @@ -27,6 +28,15 @@ internal class ClientOnlyStream<Input, Output>(
return messageStream.send(input)
}

override suspend fun receiveAndClose(): StreamResult<Output> {
val resultChannel = messageStream.resultChannel()
try {
return resultChannel.receive()
} finally {
resultChannel.cancel()
}
}

override fun close() {
messageStream.close()
}
Expand Down
27 changes: 18 additions & 9 deletions library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,27 @@ class ProtocolClient(
channel.send(result)
}
continuation.invokeOnCancellation {
httpStream.close()
httpStream.sendClose()
httpStream.receiveClose()
}
val stream = Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
},
onReceiveClose = {
httpStream.receiveClose()
},
onSendClose = {
httpStream.sendClose()
}
)
channel.invokeOnClose {
// Receive channel is closed so the stream's receive will be closed.
stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
},
onClose = {
httpStream.close()
}
),
stream,
requestCodec,
channel
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ internal class ServerOnlyStream<Input, Output>(
return messageStream.resultChannel()
}

override suspend fun sendAndClose(input: Input): Result<Unit> {
return try {
messageStream.send(input)
} finally {
messageStream.close()
}
}

override suspend fun send(input: Input): Result<Unit> {
return messageStream.send(input)
}
Expand Down
23 changes: 11 additions & 12 deletions okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ internal fun OkHttpClient.initializeStream(
request: HTTPRequest,
onResult: suspend (StreamResult<Buffer>) -> Unit
): Stream {
val isClosed = AtomicBoolean(false)
val isSendClosed = AtomicBoolean(false)
val isReceiveClosed = AtomicBoolean(false)
val duplexRequestBody = PipeDuplexRequestBody(request.contentType.toMediaType())
val builder = Request.Builder()
.url(request.url)
Expand All @@ -60,23 +61,21 @@ internal fun OkHttpClient.initializeStream(
}
val callRequest = builder.build()
val call = newCall(callRequest)
call.enqueue(ResponseCallback(onResult, isClosed))
call.enqueue(ResponseCallback(onResult, isSendClosed))
return Stream(
onSend = { buffer ->
if (!isClosed.get()) {
if (!isSendClosed.get()) {
duplexRequestBody.forConsume(buffer)
}
},
onClose = {
try {
isClosed.set(true)
call.cancel()
duplexRequestBody.close()
} catch (_: Throwable) {
// No-op
}
onSendClose = {
isSendClosed.set(true)
duplexRequestBody.close()
}
)
) {
isReceiveClosed.set(true)
call.cancel()
}
}

private class ResponseCallback(
Expand Down

0 comments on commit a16ff78

Please sign in to comment.