Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split stream close functionality #58

Merged
merged 4 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,36 @@ interface BidirectionalStreamInterface<Input, Output> {
fun close()

/**
* Determine if the underlying stream is closed.
* Determine if the underlying send and receive stream is closed.
*
* @return true if the underlying stream is closed. If the stream is still open,
* @return true if the underlying send and receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isClosed(): Boolean

/**
* Close the send stream. No calls to [send] are valid after calling [sendClose].
*/
fun sendClose()

/**
* Close the receive stream.
*/
fun receiveClose()

/**
* Determine if the underlying client send stream is closed.
*
* @return true if the underlying client receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isSendClosed(): Boolean

/**
* Determine if the underlying client receive stream is closed.
*
* @return true if the underlying client receive stream is closed. If the stream is still open,
* this will return false.
*/
fun isReceiveClosed(): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ interface ClientOnlyStreamInterface<Input, Output> {
*/
suspend fun send(input: Input): Result<Unit>

/**
* Receive a single response and close the stream.
*
* @return the single response [StreamResult].
*/
suspend fun receiveAndClose(): StreamResult<Output>

/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ interface ServerOnlyStreamInterface<Input, Output> {
*/
suspend fun send(input: Input): Result<Unit>

/**
* Send a request to the server over the stream and closes the request.
*
* Can only be called exactly one time when starting the stream.
*
* @param input The request message to send.
* @return [Result.success] on send success, [Result.failure] on
* any sends which are not successful.
*/
suspend fun sendAndClose(input: Input): Result<Unit>

/**
* Close the stream. No calls to [send] are valid after calling [close].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ interface HTTPClientInterface {

class Stream(
private val onSend: (Buffer) -> Unit,
private val onClose: () -> Unit
private val onSendClose: () -> Unit = {},
private val onReceiveClose: () -> Unit = {}
) {
private val isClosed = AtomicReference(false)
private val isSendClosed = AtomicReference(false)
private val isReceiveClosed = AtomicReference(false)

fun send(buffer: Buffer): Result<Unit> {
if (isClosed()) {
Expand All @@ -65,13 +67,27 @@ class Stream(
}
}

fun close() {
if (!isClosed.getAndSet(true)) {
onClose()
fun sendClose() {
if (!isSendClosed.getAndSet(true)) {
onSendClose()
}
}

fun receiveClose() {
if (!isReceiveClosed.getAndSet(true)) {
onReceiveClose()
}
}

fun isClosed(): Boolean {
return isClosed.get()
return isSendClosed() && isReceiveClosed()
}

fun isSendClosed(): Boolean {
return isSendClosed.get()
}

fun isReceiveClosed(): Boolean {
return isReceiveClosed.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,26 @@ internal class BidirectionalStream<Input, Output>(
}

override fun close() {
stream.close()
stream.sendClose()
}

override fun isClosed(): Boolean {
return stream.isClosed()
}

override fun sendClose() {
stream.sendClose()
}

override fun receiveClose() {
stream.receiveClose()
}

override fun isSendClosed(): Boolean {
return stream.isSendClosed()
}

override fun isReceiveClosed(): Boolean {
return stream.isReceiveClosed()
}
}
10 changes: 10 additions & 0 deletions library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package build.buf.connect.impl

import build.buf.connect.BidirectionalStreamInterface
import build.buf.connect.ClientOnlyStreamInterface
import build.buf.connect.StreamResult

/**
* Concrete implementation of [ClientOnlyStreamInterface].
Expand All @@ -27,6 +28,15 @@ internal class ClientOnlyStream<Input, Output>(
return messageStream.send(input)
}

override suspend fun receiveAndClose(): StreamResult<Output> {
val resultChannel = messageStream.resultChannel()
try {
return resultChannel.receive()
} finally {
resultChannel.cancel()
}
}

override fun close() {
messageStream.close()
}
Expand Down
27 changes: 18 additions & 9 deletions library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,27 @@ class ProtocolClient(
channel.send(result)
}
continuation.invokeOnCancellation {
httpStream.close()
httpStream.sendClose()
httpStream.receiveClose()
}
val stream = Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
},
onReceiveClose = {
httpStream.receiveClose()
},
onSendClose = {
httpStream.sendClose()
}
)
channel.invokeOnClose {
// Receive channel is closed so the stream's receive will be closed.
stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
},
onClose = {
httpStream.close()
}
),
stream,
requestCodec,
channel
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ internal class ServerOnlyStream<Input, Output>(
return messageStream.resultChannel()
}

override suspend fun sendAndClose(input: Input): Result<Unit> {
return try {
messageStream.send(input)
} finally {
messageStream.close()
}
}

override suspend fun send(input: Input): Result<Unit> {
return messageStream.send(input)
}
Expand Down
23 changes: 11 additions & 12 deletions okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ internal fun OkHttpClient.initializeStream(
request: HTTPRequest,
onResult: suspend (StreamResult<Buffer>) -> Unit
): Stream {
val isClosed = AtomicBoolean(false)
val isSendClosed = AtomicBoolean(false)
val isReceiveClosed = AtomicBoolean(false)
val duplexRequestBody = PipeDuplexRequestBody(request.contentType.toMediaType())
val builder = Request.Builder()
.url(request.url)
Expand All @@ -60,23 +61,21 @@ internal fun OkHttpClient.initializeStream(
}
val callRequest = builder.build()
val call = newCall(callRequest)
call.enqueue(ResponseCallback(onResult, isClosed))
call.enqueue(ResponseCallback(onResult, isSendClosed))
return Stream(
onSend = { buffer ->
if (!isClosed.get()) {
if (!isSendClosed.get()) {
duplexRequestBody.forConsume(buffer)
}
},
onClose = {
try {
isClosed.set(true)
call.cancel()
duplexRequestBody.close()
} catch (_: Throwable) {
// No-op
}
onSendClose = {
isSendClosed.set(true)
duplexRequestBody.close()
}
)
) {
isReceiveClosed.set(true)
call.cancel()
}
}

private class ResponseCallback(
Expand Down
Loading