From 5e5a061e9da9ef1277efe73ce781285090e0de5c Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:21:48 -0500 Subject: [PATCH 1/4] add support for accessing headers and trailers using CompletableDeferred, which completes when the headers or trailers are received --- .../BidirectionalStreamInterface.kt | 20 ++++++++++ .../connectrpc/ClientOnlyStreamInterface.kt | 20 ++++++++++ .../connectrpc/ServerOnlyStreamInterface.kt | 20 ++++++++++ .../connectrpc/http/HTTPClientInterface.kt | 7 +++- .../connectrpc/impl/BidirectionalStream.kt | 16 +++++++- .../com/connectrpc/impl/ClientOnlyStream.kt | 10 +++++ .../com/connectrpc/impl/ProtocolClient.kt | 37 +++++++++++++++---- .../com/connectrpc/impl/ServerOnlyStream.kt | 10 +++++ .../com/connectrpc/okhttp/OkHttpStream.kt | 2 + 9 files changed, 131 insertions(+), 11 deletions(-) diff --git a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt index 803c4ecf..87778622 100644 --- a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt @@ -14,6 +14,7 @@ package com.connectrpc +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -27,6 +28,25 @@ interface BidirectionalStreamInterface { */ fun responseChannel(): ReceiveChannel + /** + * The response headers. This value will become available before any output + * messages become available from the [responseChannel] and before trailers + * are available from [responseTrailers]. If the stream fails before headers + * are ever received, this will complete with an empty value. The + * [ReceiveChannel.receive] method of [responseChannel] can be used to + * recover the exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [ReceiveChannel.receive] + * method of [responseChannel] can be used to recover the exception that caused + * such a failure. + */ + fun responseTrailers(): Deferred + /** * Send a request to the server over the stream. * diff --git a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt index 1927fe44..be625bf4 100644 --- a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt @@ -14,6 +14,8 @@ package com.connectrpc +import kotlinx.coroutines.Deferred + /** * Represents a client-only stream (a stream where the client streams data to the server and * eventually receives a response) that can send request messages and initiate closes. @@ -34,6 +36,24 @@ interface ClientOnlyStreamInterface { */ suspend fun receiveAndClose(): Output + /** + * The response headers. This value will become available before any call to + * [receiveAndClose] completes and before trailers are available from + * [responseTrailers] (though these may occur nearly simultaneously). If the + * stream fails before headers are ever received, this will complete with an + * empty value. The [receiveAndClose] method can be used to recover the + * exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [receiveAndClose] + * method can be used to recover the exception that caused such a failure. + */ + fun responseTrailers(): Deferred + /** * Close the stream. No calls to [send] are valid after calling [sendClose]. */ diff --git a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt index 6783ee9a..f7adbfae 100644 --- a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt @@ -14,6 +14,7 @@ package com.connectrpc +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -28,6 +29,25 @@ interface ServerOnlyStreamInterface { */ fun responseChannel(): ReceiveChannel + /** + * The response headers. This value will become available before any output + * messages become available from the [responseChannel] and before trailers + * are available from [responseTrailers]. If the stream fails before headers + * are ever received, this will complete with an empty value. The + * [ReceiveChannel.receive] method of [responseChannel] can be used to + * recover the exception that caused such a failure. + */ + fun responseHeaders(): Deferred + + /** + * The response trailers. This value will not become available until the entire + * RPC operation is complete. If the stream fails before trailers are ever + * received, this will complete with an empty value. The [ReceiveChannel.receive] + * method of [responseChannel] can be used to recover the exception that caused + * such a failure. + */ + fun responseTrailers(): Deferred + /** * Send a request to the server over the stream and closes the request. * diff --git a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt index 9d5e92d3..ccad26b1 100644 --- a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt @@ -56,7 +56,7 @@ class Stream( private val isReceiveClosed = AtomicReference(false) fun send(buffer: Buffer): Result { - if (isClosed()) { + if (isSendClosed()) { return Result.failure(IllegalStateException("cannot send. underlying stream is closed")) } return try { @@ -76,11 +76,14 @@ class Stream( fun receiveClose() { if (!isReceiveClosed.getAndSet(true)) { onReceiveClose() + // closing receive side implicitly closes send side, too + isSendClosed.set(true) } } + // TODO: remove this method as it is redundant with receive closed fun isClosed(): Boolean { - return isSendClosed() && isReceiveClosed() + return isReceiveClosed() } fun isSendClosed(): Boolean { diff --git a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt index 5048b09b..9b0006e7 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt @@ -16,7 +16,9 @@ package com.connectrpc.impl import com.connectrpc.BidirectionalStreamInterface import com.connectrpc.Codec +import com.connectrpc.Headers import com.connectrpc.http.Stream +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import java.lang.Exception @@ -27,7 +29,9 @@ import java.lang.Exception internal class BidirectionalStream( val stream: Stream, private val requestCodec: Codec, - private val receiveChannel: Channel, + private val responseChannel: Channel, + private val responseHeaders: Deferred, + private val responseTrailers: Deferred, ) : BidirectionalStreamInterface { override suspend fun send(input: Input): Result { @@ -40,7 +44,15 @@ internal class BidirectionalStream( } override fun responseChannel(): ReceiveChannel { - return receiveChannel + return responseChannel + } + + override fun responseHeaders(): Deferred { + return responseHeaders + } + + override fun responseTrailers(): Deferred { + return responseTrailers } override fun isClosed(): Boolean { diff --git a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt index b1e9f052..a712a6e4 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt @@ -18,6 +18,8 @@ import com.connectrpc.BidirectionalStreamInterface import com.connectrpc.ClientOnlyStreamInterface import com.connectrpc.Code import com.connectrpc.ConnectException +import com.connectrpc.Headers +import kotlinx.coroutines.Deferred /** * Concrete implementation of [ClientOnlyStreamInterface]. @@ -44,6 +46,14 @@ internal class ClientOnlyStream( } } + override fun responseHeaders(): Deferred { + return messageStream.responseHeaders() + } + + override fun responseTrailers(): Deferred { + return messageStream.responseTrailers() + } + override fun sendClose() { return messageStream.sendClose() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index 79aa94bc..6dcae044 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -31,6 +31,7 @@ import com.connectrpc.http.HTTPClientInterface import com.connectrpc.http.HTTPRequest import com.connectrpc.http.Stream import com.connectrpc.protocols.GETConfiguration +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.suspendCancellableCoroutine import java.net.URL @@ -178,6 +179,8 @@ class ProtocolClient( headers: Headers, ): BidirectionalStream = suspendCancellableCoroutine { continuation -> val channel = Channel(1) + val responseHeaders = CompletableDeferred() + val responseTrailers = CompletableDeferred() val requestCodec = config.serializationStrategy.codec(methodSpec.requestClass) val responseCodec = config.serializationStrategy.codec(methodSpec.responseClass) val request = HTTPRequest( @@ -197,32 +200,51 @@ class ProtocolClient( // Pass through the interceptor chain. when (val streamResult = streamFunc.streamResultFunction(initialResult)) { is StreamResult.Headers -> { - // Not currently used except for interceptors. + // If this is incorrectly called 2x, only the first result is used. + // Subsequent calls to complete will be ignored. + responseHeaders.complete(streamResult.headers) } is StreamResult.Message -> { + // Just in case protocol impl failed to provide StreamResult.Headers, + // treat headers as empty. This is a no-op if we did correctly receive + // them already. + responseHeaders.complete(emptyMap()) try { val message = responseCodec.deserialize( streamResult.message, ) channel.send(message) } catch (e: Throwable) { + // TODO: setting isComplete, responseTrailers, and RPC status + // here seems wrong. What would prevent the call to + // channel.send such that we don't bother getting the + // actual result/trailers from the server? isComplete = true - channel.close(ConnectException(Code.UNKNOWN, exception = e)) + try { + channel.close(ConnectException(Code.UNKNOWN, exception = e)) + } finally { + responseTrailers.complete(emptyMap()) + } } } is StreamResult.Complete -> { + // This is a no-op if we already received a StreamResult.Headers. + responseHeaders.complete(emptyMap()) isComplete = true - when (streamResult.code) { - Code.OK -> channel.close() - else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code)) + try { + when (streamResult.code) { + Code.OK -> channel.close() + else -> channel.close(streamResult.connectException() ?: ConnectException(code = streamResult.code)) + } + } finally { + responseTrailers.complete(streamResult.trailers) } } } } continuation.invokeOnCancellation { - httpStream.sendClose() httpStream.receiveClose() } val stream = Stream( @@ -237,7 +259,6 @@ class ProtocolClient( }, ) channel.invokeOnClose { - // Receive channel is closed so the stream's receive will be closed. stream.receiveClose() } continuation.resume( @@ -245,6 +266,8 @@ class ProtocolClient( stream, requestCodec, channel, + responseHeaders, + responseTrailers, ), ) } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt index b2d01bbd..d1fa9b3e 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt @@ -15,7 +15,9 @@ package com.connectrpc.impl import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers import com.connectrpc.ServerOnlyStreamInterface +import kotlinx.coroutines.Deferred import kotlinx.coroutines.channels.ReceiveChannel /** @@ -28,6 +30,14 @@ internal class ServerOnlyStream( return messageStream.responseChannel() } + override fun responseHeaders(): Deferred { + return messageStream.responseHeaders() + } + + override fun responseTrailers(): Deferred { + return messageStream.responseTrailers() + } + override suspend fun sendAndClose(input: Input): Result { try { return messageStream.send(input) diff --git a/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt b/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt index e1fbbfcf..ef1bdb24 100644 --- a/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt +++ b/okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt @@ -76,6 +76,8 @@ internal fun OkHttpClient.initializeStream( onReceiveClose = { isReceiveClosed.set(true) call.cancel() + // cancelling implicitly closes send-side, too + isSendClosed.set(true) }, ) } From 756c3b0081eaf4359a229420742ee3ea25c71e49 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 7 Dec 2023 16:39:40 -0500 Subject: [PATCH 2/4] add header usage to conformance test cases --- .../conformance/java/ConformanceTest.kt | 22 ++++++++++++++++--- .../conformance/javalite/ConformanceTest.kt | 22 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt index 800ad92e..da8dd503 100644 --- a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt +++ b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit @RunWith(Parameterized::class) class ConformanceTest( - protocol: NetworkProtocol, + private val protocol: NetworkProtocol, serverType: ServerType, ) : BaseConformanceTest(protocol, serverType) { + companion object { + private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val requestHeaders = responseHeaders + responseTrailers + } private lateinit var unimplementedServiceClient: UnimplementedServiceClient private lateinit var testServiceConnectClient: TestServiceClient @@ -68,7 +73,7 @@ class ConformanceTest( @Test fun serverStreaming(): Unit = runBlocking { val sizes = listOf(512_000, 16, 2_028, 65_536) - val stream = testServiceConnectClient.streamingOutputCall() + val stream = testServiceConnectClient.streamingOutputCall(requestHeaders) val params = sizes.map { responseParameters { size = it } }.toList() stream.sendAndClose( streamingOutputCallRequest { @@ -76,12 +81,15 @@ class ConformanceTest( responseParameters += params }, ).getOrThrow() + assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders) val responses = mutableListOf() for (response in stream.responseChannel()) { responses.add(response) } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @Test @@ -673,7 +681,7 @@ class ConformanceTest( @Test fun clientStreaming(): Unit = runBlocking { - val stream = testServiceConnectClient.streamingInputCall(emptyMap()) + val stream = testServiceConnectClient.streamingInputCall() var sum = 0 listOf(256000, 8, 1024, 32768).forEach { size -> stream.send( @@ -692,6 +700,14 @@ class ConformanceTest( try { val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) + assertThat(stream.responseHeaders().isCompleted).isTrue() + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseHeaders().await()).isNotEmpty() + if (protocol != NetworkProtocol.CONNECT) { + // gRPC and gRPC-web communicate RPC status in trailers, so + // they should always have something. + assertThat(stream.responseTrailers().await()).isNotEmpty() + } } finally { countDownLatch.countDown() } diff --git a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt index 3c8e552a..e4b91fb1 100644 --- a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt +++ b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit @RunWith(Parameterized::class) class ConformanceTest( - protocol: NetworkProtocol, + private val protocol: NetworkProtocol, serverType: ServerType, ) : BaseConformanceTest(protocol, serverType) { + companion object { + private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val requestHeaders = responseHeaders + responseTrailers + } private lateinit var unimplementedServiceClient: UnimplementedServiceClient private lateinit var testServiceConnectClient: TestServiceClient @@ -68,7 +73,7 @@ class ConformanceTest( @Test fun serverStreaming(): Unit = runBlocking { val sizes = listOf(512_000, 16, 2_028, 65_536) - val stream = testServiceConnectClient.streamingOutputCall() + val stream = testServiceConnectClient.streamingOutputCall(requestHeaders) val params = sizes.map { responseParameters { size = it } }.toList() stream.sendAndClose( streamingOutputCallRequest { @@ -76,12 +81,15 @@ class ConformanceTest( responseParameters += params }, ).getOrThrow() + assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders) val responses = mutableListOf() for (response in stream.responseChannel()) { responses.add(response) } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @Test @@ -673,7 +681,7 @@ class ConformanceTest( @Test fun clientStreaming(): Unit = runBlocking { - val stream = testServiceConnectClient.streamingInputCall(emptyMap()) + val stream = testServiceConnectClient.streamingInputCall() var sum = 0 listOf(256000, 8, 1024, 32768).forEach { size -> stream.send( @@ -692,6 +700,14 @@ class ConformanceTest( try { val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) + assertThat(stream.responseHeaders().isCompleted).isTrue() + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseHeaders().await()).isNotEmpty() + if (protocol != NetworkProtocol.CONNECT) { + // gRPC and gRPC-web communicate RPC status in trailers, so + // they should always have something. + assertThat(stream.responseTrailers().await()).isNotEmpty() + } } finally { countDownLatch.countDown() } From 371f1c0c435030a36cd003a62be7b8012f69f30b Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:06:19 -0500 Subject: [PATCH 3/4] review feedback --- .../connectrpc/conformance/java/ConformanceTest.kt | 2 +- .../conformance/javalite/ConformanceTest.kt | 2 +- .../com/connectrpc/http/HTTPClientInterface.kt | 14 +++++++++++--- .../kotlin/com/connectrpc/impl/ProtocolClient.kt | 4 ---- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt index da8dd503..099353ea 100644 --- a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt +++ b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt @@ -56,7 +56,7 @@ class ConformanceTest( ) : BaseConformanceTest(protocol, serverType) { companion object { private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) - private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL"))) // base64-encoded 0x0a0b0a0b0a0b private val requestHeaders = responseHeaders + responseTrailers } diff --git a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt index e4b91fb1..21c44643 100644 --- a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt +++ b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt @@ -56,7 +56,7 @@ class ConformanceTest( ) : BaseConformanceTest(protocol, serverType) { companion object { private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) - private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL"))) // base64-encoded 0x0a0b0a0b0a0b private val requestHeaders = responseHeaders + responseTrailers } diff --git a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt index ccad26b1..4f1e2baf 100644 --- a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt @@ -75,9 +75,17 @@ class Stream( fun receiveClose() { if (!isReceiveClosed.getAndSet(true)) { - onReceiveClose() - // closing receive side implicitly closes send side, too - isSendClosed.set(true) + try { + onReceiveClose() + } finally { + // When receive side is closed, the send side is + // implicitly closed as well. + // We don't use sendClose() because we don't want to + // invoke onSendClose() since that will try to actually + // half-close the HTTP stream, which will fail since the + // closing the receive side cancels the entire thing. + isSendClosed.set(true) + } } } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index 6dcae044..6b5345a9 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -216,10 +216,6 @@ class ProtocolClient( ) channel.send(message) } catch (e: Throwable) { - // TODO: setting isComplete, responseTrailers, and RPC status - // here seems wrong. What would prevent the call to - // channel.send such that we don't bother getting the - // actual result/trailers from the server? isComplete = true try { channel.close(ConnectException(Code.UNKNOWN, exception = e)) From 7a77ef4072f4a1739966fb030f806941ea8453b6 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:10:23 -0500 Subject: [PATCH 4/4] remove racy assertion: trailers could be completing concurrently with test code, so not safe to assert they are already complete --- .../kotlin/com/connectrpc/conformance/java/ConformanceTest.kt | 2 -- .../com/connectrpc/conformance/javalite/ConformanceTest.kt | 2 -- 2 files changed, 4 deletions(-) diff --git a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt index 099353ea..39f22722 100644 --- a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt +++ b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt @@ -88,7 +88,6 @@ class ConformanceTest( } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) - assertThat(stream.responseTrailers().isCompleted).isTrue() assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @@ -701,7 +700,6 @@ class ConformanceTest( val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) assertThat(stream.responseHeaders().isCompleted).isTrue() - assertThat(stream.responseTrailers().isCompleted).isTrue() assertThat(stream.responseHeaders().await()).isNotEmpty() if (protocol != NetworkProtocol.CONNECT) { // gRPC and gRPC-web communicate RPC status in trailers, so diff --git a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt index 21c44643..5f3e813c 100644 --- a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt +++ b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt @@ -88,7 +88,6 @@ class ConformanceTest( } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) - assertThat(stream.responseTrailers().isCompleted).isTrue() assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @@ -701,7 +700,6 @@ class ConformanceTest( val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) assertThat(stream.responseHeaders().isCompleted).isTrue() - assertThat(stream.responseTrailers().isCompleted).isTrue() assertThat(stream.responseHeaders().await()).isNotEmpty() if (protocol != NetworkProtocol.CONNECT) { // gRPC and gRPC-web communicate RPC status in trailers, so