Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for accessing headers and trailers to streaming calls #171

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -27,6 +28,25 @@ interface BidirectionalStreamInterface<Input, Output> {
*/
fun responseChannel(): ReceiveChannel<Output>

/**
* The response headers. This value will become available before any output
* messages become available from the [responseChannel] and before trailers
* are available from [responseTrailers]. If the stream fails before headers
* are ever received, this will complete with an empty value. The
* [ReceiveChannel.receive] method of [responseChannel] can be used to
* recover the exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [ReceiveChannel.receive]
* method of [responseChannel] can be used to recover the exception that caused
* such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Send a request to the server over the stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.connectrpc

import kotlinx.coroutines.Deferred

/**
* Represents a client-only stream (a stream where the client streams data to the server and
* eventually receives a response) that can send request messages and initiate closes.
Expand All @@ -34,6 +36,24 @@ interface ClientOnlyStreamInterface<Input, Output> {
*/
suspend fun receiveAndClose(): Output

/**
* The response headers. This value will become available before any call to
* [receiveAndClose] completes and before trailers are available from
* [responseTrailers] (though these may occur nearly simultaneously). If the
* stream fails before headers are ever received, this will complete with an
* empty value. The [receiveAndClose] method can be used to recover the
* exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [receiveAndClose]
* method can be used to recover the exception that caused such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Close the stream. No calls to [send] are valid after calling [sendClose].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -28,6 +29,25 @@ interface ServerOnlyStreamInterface<Input, Output> {
*/
fun responseChannel(): ReceiveChannel<Output>

/**
* The response headers. This value will become available before any output
* messages become available from the [responseChannel] and before trailers
* are available from [responseTrailers]. If the stream fails before headers
* are ever received, this will complete with an empty value. The
* [ReceiveChannel.receive] method of [responseChannel] can be used to
* recover the exception that caused such a failure.
*/
fun responseHeaders(): Deferred<Headers>

/**
* The response trailers. This value will not become available until the entire
* RPC operation is complete. If the stream fails before trailers are ever
* received, this will complete with an empty value. The [ReceiveChannel.receive]
* method of [responseChannel] can be used to recover the exception that caused
* such a failure.
*/
fun responseTrailers(): Deferred<Headers>

/**
* Send a request to the server over the stream and closes the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Stream(
private val isReceiveClosed = AtomicReference(false)

fun send(buffer: Buffer): Result<Unit> {
if (isClosed()) {
if (isSendClosed()) {
return Result.failure(IllegalStateException("cannot send. underlying stream is closed"))
}
return try {
Expand All @@ -76,11 +76,14 @@ class Stream(
fun receiveClose() {
if (!isReceiveClosed.getAndSet(true)) {
onReceiveClose()
// closing receive side implicitly closes send side, too
isSendClosed.set(true)
Copy link
Contributor

@pkwarren pkwarren Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call sendClose() instead here so the onSendClose() callback is always invoked? Do we want to invoke this in a try / finally to ensure it will always be run even if an exception occurs in onReceiveClose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want onSendClose() to be invoked because that will try to half-close the stream (which should fail since the thing is already done at this point). And I don't think we want to call sendClose() first (before doing the receive side) because that is just an extra, unnecessary network packet to send the half-close (technically an empty "end-of-stream" data frame) when the cancellation is all that is needed. As the comment says, closing the receive side of the stream implicitly closes the send -- so book-keeping is all that is needed here and no other action. I can add comments to this effect.

Unclear about using try/finally. On the one hand, if onReceiveClose fails, then the stream may not be closed so that's an argument to not use finally. However, it doesn't really make much sense to leave isSendClosed set to false when isReceiveClosed is set to true. So I guess I've talked myself into needing a finally here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use try/finally and added a comment as to why this isn't calling sendClose().

}
}

// TODO: remove this method as it is redundant with receive closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to remove the method if it isn't needed. I don't really see any consumers of this anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's referenced indirectly in all three of the stream interfaces and their impls. I figured I'd put that diff churn in a separate PR where I also hope to do some other improvement/overhauling of the stream interfaces.

fun isClosed(): Boolean {
return isSendClosed() && isReceiveClosed()
return isReceiveClosed()
}

fun isSendClosed(): Boolean {
Expand Down
16 changes: 14 additions & 2 deletions library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package com.connectrpc.impl

import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.Codec
import com.connectrpc.Headers
import com.connectrpc.http.Stream
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import java.lang.Exception
Expand All @@ -27,7 +29,9 @@ import java.lang.Exception
internal class BidirectionalStream<Input, Output>(
val stream: Stream,
private val requestCodec: Codec<Input>,
private val receiveChannel: Channel<Output>,
private val responseChannel: Channel<Output>,
private val responseHeaders: Deferred<Headers>,
private val responseTrailers: Deferred<Headers>,
) : BidirectionalStreamInterface<Input, Output> {

override suspend fun send(input: Input): Result<Unit> {
Expand All @@ -40,7 +44,15 @@ internal class BidirectionalStream<Input, Output>(
}

override fun responseChannel(): ReceiveChannel<Output> {
return receiveChannel
return responseChannel
}

override fun responseHeaders(): Deferred<Headers> {
return responseHeaders
}

override fun responseTrailers(): Deferred<Headers> {
return responseTrailers
}

override fun isClosed(): Boolean {
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.ClientOnlyStreamInterface
import com.connectrpc.Code
import com.connectrpc.ConnectException
import com.connectrpc.Headers
import kotlinx.coroutines.Deferred

/**
* Concrete implementation of [ClientOnlyStreamInterface].
Expand All @@ -44,6 +46,14 @@ internal class ClientOnlyStream<Input, Output>(
}
}

override fun responseHeaders(): Deferred<Headers> {
return messageStream.responseHeaders()
}

override fun responseTrailers(): Deferred<Headers> {
return messageStream.responseTrailers()
}

override fun sendClose() {
return messageStream.sendClose()
}
Expand Down
37 changes: 30 additions & 7 deletions library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.HTTPRequest
import com.connectrpc.http.Stream
import com.connectrpc.protocols.GETConfiguration
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.suspendCancellableCoroutine
import java.net.URL
Expand Down Expand Up @@ -178,6 +179,8 @@ class ProtocolClient(
headers: Headers,
): BidirectionalStream<Input, Output> = suspendCancellableCoroutine { continuation ->
val channel = Channel<Output>(1)
val responseHeaders = CompletableDeferred<Headers>()
val responseTrailers = CompletableDeferred<Headers>()
val requestCodec = config.serializationStrategy.codec(methodSpec.requestClass)
val responseCodec = config.serializationStrategy.codec(methodSpec.responseClass)
val request = HTTPRequest(
Expand All @@ -197,32 +200,51 @@ class ProtocolClient(
// Pass through the interceptor chain.
when (val streamResult = streamFunc.streamResultFunction(initialResult)) {
is StreamResult.Headers -> {
// Not currently used except for interceptors.
// If this is incorrectly called 2x, only the first result is used.
// Subsequent calls to complete will be ignored.
responseHeaders.complete(streamResult.headers)
}

is StreamResult.Message -> {
// Just in case protocol impl failed to provide StreamResult.Headers,
// treat headers as empty. This is a no-op if we did correctly receive
// them already.
responseHeaders.complete(emptyMap())
try {
val message = responseCodec.deserialize(
streamResult.message,
)
channel.send(message)
} catch (e: Throwable) {
// TODO: setting isComplete, responseTrailers, and RPC status
// here seems wrong. What would prevent the call to
// channel.send such that we don't bother getting the
// actual result/trailers from the server?
jhump marked this conversation as resolved.
Show resolved Hide resolved
isComplete = true
channel.close(ConnectException(Code.UNKNOWN, exception = e))
try {
channel.close(ConnectException(Code.UNKNOWN, exception = e))
} finally {
responseTrailers.complete(emptyMap())
}
}
}

is StreamResult.Complete -> {
// This is a no-op if we already received a StreamResult.Headers.
responseHeaders.complete(emptyMap())
isComplete = true
when (streamResult.code) {
Code.OK -> channel.close()
else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code))
try {
when (streamResult.code) {
Code.OK -> channel.close()
else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code))
}
} finally {
responseTrailers.complete(streamResult.trailers)
}
}
}
}
continuation.invokeOnCancellation {
httpStream.sendClose()
httpStream.receiveClose()
}
val stream = Stream(
Expand All @@ -237,14 +259,15 @@ class ProtocolClient(
},
)
channel.invokeOnClose {
// Receive channel is closed so the stream's receive will be closed.
stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
stream,
requestCodec,
channel,
responseHeaders,
responseTrailers,
),
)
}
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.connectrpc.impl

import com.connectrpc.BidirectionalStreamInterface
import com.connectrpc.Headers
import com.connectrpc.ServerOnlyStreamInterface
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
Expand All @@ -28,6 +30,14 @@ internal class ServerOnlyStream<Input, Output>(
return messageStream.responseChannel()
}

override fun responseHeaders(): Deferred<Headers> {
return messageStream.responseHeaders()
}

override fun responseTrailers(): Deferred<Headers> {
return messageStream.responseTrailers()
}

override suspend fun sendAndClose(input: Input): Result<Unit> {
try {
return messageStream.send(input)
Expand Down
2 changes: 2 additions & 0 deletions okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ internal fun OkHttpClient.initializeStream(
onReceiveClose = {
isReceiveClosed.set(true)
call.cancel()
// cancelling implicitly closes send-side, too
isSendClosed.set(true)
},
)
}
Expand Down