From 0894da378101e43a628d4f19dc2ce72b4bad08ea Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:30:16 -0500 Subject: [PATCH 1/3] provide way to auto-close streams --- .../connectrpc/conformance/client/Client.kt | 94 ++++++++----------- .../client/adapt/BidiStreamClient.kt | 20 +++- .../client/adapt/ClientStreamClient.kt | 18 +++- .../conformance/client/adapt/Closeable.kt | 43 +++++++++ .../conformance/client/adapt/Invoker.kt | 4 +- .../conformance/client/adapt/RequestStream.kt | 16 +++- .../client/adapt/ResponseStream.kt | 10 +- .../client/adapt/ServerStreamClient.kt | 14 +++ 8 files changed, 150 insertions(+), 69 deletions(-) create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt index a10820ce..1dd417a1 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -36,6 +36,7 @@ import com.connectrpc.conformance.client.adapt.Invoker import com.connectrpc.conformance.client.adapt.ResponseStream import com.connectrpc.conformance.client.adapt.ServerStreamClient import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.client.adapt.execute import com.connectrpc.http.HTTPClientInterface import com.connectrpc.impl.ProtocolClient import com.connectrpc.okhttp.ConnectOkHttpClient @@ -46,7 +47,6 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import okhttp3.OkHttpClient -import okhttp3.Protocol import okhttp3.tls.HandshakeCertificates import okhttp3.tls.HeldCertificate import java.security.KeyFactory @@ -163,8 +163,7 @@ class Client( ) { throw RuntimeException("client stream calls can only support `BeforeCloseSend` and 'AfterCloseSendMs' cancellation field, instead got ${req.cancel!!::class.simpleName}") } - val stream = client.execute(req.requestHeaders) - try { + client.execute(req.requestHeaders) { stream -> var numUnsent = 0 for (i in req.requestMessages.indices) { if (req.requestDelayMs > 0) { @@ -184,12 +183,12 @@ class Client( } when (val cancel = req.cancel) { is Cancel.BeforeCloseSend -> { - stream.cancel() + stream.close() } is Cancel.AfterCloseSendMs -> { launch { delay(cancel.millis.toLong()) - stream.cancel() + stream.close() } } else -> { @@ -197,9 +196,7 @@ class Client( // So this case means no cancellation. } } - return@coroutineScope unaryResult(numUnsent, stream.closeAndReceive()) - } finally { - stream.cancel() + unaryResult(numUnsent, stream.closeAndReceive()) } } @@ -220,37 +217,34 @@ class Client( throw RuntimeException("server stream calls can only support `AfterCloseSendMs` and 'AfterNumResponses' cancellation field, instead got ${req.cancel!!::class.simpleName}") } val msg = fromAny(req.requestMessages[0], client.reqTemplate, SERVER_STREAM_REQUEST_NAME) - val stream: ResponseStream + var sent = false try { - // TODO: should this throw? Maybe not... - // An alternative would be to have it return a - // stream that throws the relevant exception in - // calls to receive. - stream = client.execute(msg, req.requestHeaders) + return client.execute(msg, req.requestHeaders) { stream -> + sent = true + val cancel = req.cancel + if (cancel is Cancel.AfterCloseSendMs) { + delay(cancel.millis.toLong()) + stream.close() + } + streamResult(0, stream, cancel) + } } catch (ex: Throwable) { - val connEx = if (ex is ConnectException) { - ex - } else { - ConnectException( - code = Code.UNKNOWN, - message = ex.message, - exception = ex, + if (!sent) { + val connEx = if (ex is ConnectException) { + ex + } else { + ConnectException( + code = Code.UNKNOWN, + message = ex.message, + exception = ex, + ) + } + return ClientResponseResult( + error = connEx, + numUnsentRequests = 1, ) } - return ClientResponseResult( - error = connEx, - numUnsentRequests = 1, - ) - } - try { - val cancel = req.cancel - if (cancel is Cancel.AfterCloseSendMs) { - delay(cancel.millis.toLong()) - stream.close() - } - return streamResult(0, stream, cancel) - } finally { - stream.close() + throw ex } } @@ -272,8 +266,7 @@ class Client( client: BidiStreamClient, req: ClientCompatRequest, ): ClientResponseResult { - val stream = client.execute(req.requestHeaders) - try { + return client.execute(req.requestHeaders) { stream -> var numUnsent = 0 for (i in req.requestMessages.indices) { if (req.requestDelayMs > 0) { @@ -294,21 +287,19 @@ class Client( val cancel = req.cancel when (cancel) { is Cancel.BeforeCloseSend -> { - stream.responses.close() // cancel + stream.close() // cancel stream.requests.close() // close send } is Cancel.AfterCloseSendMs -> { stream.requests.close() // close send delay(cancel.millis.toLong()) - stream.responses.close() // cancel + stream.close() // cancel } else -> { stream.requests.close() // close send } } - return streamResult(numUnsent, stream.responses, cancel) - } finally { - stream.responses.close() + streamResult(numUnsent, stream.responses, cancel) } } @@ -316,8 +307,7 @@ class Client( client: BidiStreamClient, req: ClientCompatRequest, ): ClientResponseResult { - val stream = client.execute(req.requestHeaders) - try { + return client.execute(req.requestHeaders) { stream -> val cancel = req.cancel val payloads: MutableList = mutableListOf() for (i in req.requestMessages.indices) { @@ -338,16 +328,16 @@ class Client( // In full-duplex mode, we read the response after writing request, // to interleave the requests and responses. if (i == 0 && cancel is Cancel.AfterNumResponses && cancel.num == 0) { - stream.responses.close() + stream.close() } try { val resp = stream.responses.messages.receive() payloads.add(payloadExtractor(resp)) if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) { - stream.responses.close() + stream.close() } } catch (ex: ConnectException) { - return ClientResponseResult( + return@execute ClientResponseResult( headers = stream.responses.headers(), payloads = payloads, error = ex, @@ -358,13 +348,13 @@ class Client( } when (cancel) { is Cancel.BeforeCloseSend -> { - stream.responses.close() // cancel + stream.close() // cancel stream.requests.close() // close send } is Cancel.AfterCloseSendMs -> { stream.requests.close() // close send delay(cancel.millis.toLong()) - stream.responses.close() // cancel + stream.close() // cancel } else -> { stream.requests.close() // close send @@ -378,7 +368,7 @@ class Client( for (resp in stream.responses.messages) { payloads.add(payloadExtractor(resp)) if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) { - stream.responses.close() + stream.close() } } trailers = stream.responses.trailers() @@ -386,14 +376,12 @@ class Client( connEx = ex trailers = ex.metadata } - return ClientResponseResult( + ClientResponseResult( headers = stream.responses.headers(), payloads = payloads, error = connEx, trailers = trailers, ) - } finally { - stream.responses.close() } } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt index 0e372c40..44442b35 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt @@ -43,9 +43,10 @@ abstract class BidiStreamClient( * @param Req The request message type * @param Resp The response message type */ - interface BidiStream { + interface BidiStream : Closeable { val requests: RequestStream val responses: ResponseStream + companion object { fun new(underlying: BidirectionalStreamInterface): BidiStream { val reqStream = RequestStream.new(underlying) @@ -56,8 +57,25 @@ abstract class BidiStreamClient( override val responses: ResponseStream get() = respStream + + override suspend fun close() { + responses.close() + } } } } } } + +/** + * Executes the bidirectional-stream call inside the given block. + * The block is used to send requests and receive responses. The + * stream is automatically closed when the block returns or throws. + */ +suspend fun BidiStreamClient.execute( + headers: Headers, + block: suspend (BidiStreamClient.BidiStream) -> R, +): R { + val stream = execute(headers) + return stream.use(block) +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt index 423cbc87..aa875e21 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -43,10 +43,9 @@ abstract class ClientStreamClient( * @param Req The request message type * @param Resp The response message type */ - interface ClientStream { + interface ClientStream : Closeable { suspend fun send(req: Req) suspend fun closeAndReceive(): ResponseMessage - suspend fun cancel() companion object { fun new(underlying: ClientOnlyStreamInterface): ClientStream { @@ -83,7 +82,7 @@ abstract class ClientStreamClient( } } - override suspend fun cancel() { + override suspend fun close() { underlying.cancel() } } @@ -91,3 +90,16 @@ abstract class ClientStreamClient( } } } + +/** + * Executes the client-stream call inside the given block. The block + * is used to send the requests and then retrieve the responses. The + * stream is automatically closed when the block returns or throws. + */ +suspend fun ClientStreamClient.execute( + headers: Headers, + block: suspend (ClientStreamClient.ClientStream) -> R, +): R { + val stream = execute(headers) + return stream.use(block) +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt new file mode 100644 index 00000000..a32ac63a --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt @@ -0,0 +1,43 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +// Like java.io.Closeable, but the close operation is suspendable. +interface Closeable { + suspend fun close() +} + +// Like the standard kotlin "use" extension function, but uses +// a suspending Closeable instead of java.io.Closeable and accepts +// a suspending block. +internal suspend fun T.use(block: suspend (T) -> R): R { + var exception: Throwable? = null + try { + return block(this) + } catch (ex: Throwable) { + exception = ex + throw exception + } finally { + try { + this.close() + } catch (ex: Throwable) { + if (exception != null) { + exception.addSuppressed(ex) + } else { + throw ex + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt index 30986914..9be05b4d 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt @@ -17,8 +17,8 @@ package com.connectrpc.conformance.client.adapt /** * An RPC stub that allows for invoking RPC methods. * Each method of Invoker corresponds to an RPC method - * and returns a client stub that can be used to actually - * invoke that RPC. + * of the conformance service and returns a client + * object that can be used to actually invoke that RPC. */ interface Invoker { fun unaryClient(): UnaryClient<*, *> diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt index a2421652..3a889f6b 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt @@ -21,10 +21,20 @@ import com.google.protobuf.MessageLite * RequestStream is a stream that allows a client to upload * zero or more request messages. When the client is done * sending messages, it must close the stream. + * + * Note that closing the request stream is not strictly + * required if the RPC is cancelled or fails prematurely + * or if the response stream is closed first. Closing the + * requests "half-closes" the stream; closing the responses + * "fully closes" it. */ -interface RequestStream { +interface RequestStream : Closeable { + /** + * Sends a message on the stream. + * @throws Exception when the request cannot be sent + * because of an error with the streaming call + */ suspend fun send(req: Req) - fun close() companion object { fun new(underlying: BidirectionalStreamInterface): RequestStream { @@ -36,7 +46,7 @@ interface RequestStream { } } - override fun close() { + override suspend fun close() { underlying.sendClose() } } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt index 9a17dc75..97fed9e7 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt @@ -29,15 +29,11 @@ import kotlinx.coroutines.channels.ReceiveChannel * * @param Resp The response message type */ -interface ResponseStream { +interface ResponseStream : Closeable { val messages: ReceiveChannel - suspend fun headers(): Headers - suspend fun trailers(): Headers - fun close() - companion object { fun new(underlying: BidirectionalStreamInterface): ResponseStream { return object : ResponseStream { @@ -52,7 +48,7 @@ interface ResponseStream { return underlying.responseTrailers().await() } - override fun close() { + override suspend fun close() { underlying.receiveClose() } } @@ -71,7 +67,7 @@ interface ResponseStream { return underlying.responseTrailers().await() } - override fun close() { + override suspend fun close() { underlying.receiveClose() } } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt index 621b4776..73df2dce 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt @@ -31,3 +31,17 @@ abstract class ServerStreamClient( ) { abstract suspend fun execute(req: Req, headers: Headers): ResponseStream } + +/** + * Executes the server-stream call inside the given block. The block + * is used to consume the responses. The stream is automatically closed + * when the block returns or throws. + */ +suspend fun ServerStreamClient.execute( + req: Req, + headers: Headers, + block: suspend (ResponseStream) -> R, +): R { + val stream = execute(req, headers) + return stream.use(block) +} From 52e0ad3f16004026e9bde8b0d5073605b4e71307 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 1 Feb 2024 19:59:37 -0500 Subject: [PATCH 2/3] promote from extension function to method, make non-block method protected, make block use CoroutineScope as this for easy access to launch and async --- .../connectrpc/conformance/client/Client.kt | 6 ++-- .../client/adapt/BidiStreamClient.kt | 32 +++++++++++-------- .../client/adapt/ClientStreamClient.kt | 32 +++++++++++-------- .../client/adapt/ServerStreamClient.kt | 32 +++++++++++-------- 4 files changed, 56 insertions(+), 46 deletions(-) diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt index 1dd417a1..a17a721c 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -36,14 +36,12 @@ import com.connectrpc.conformance.client.adapt.Invoker import com.connectrpc.conformance.client.adapt.ResponseStream import com.connectrpc.conformance.client.adapt.ServerStreamClient import com.connectrpc.conformance.client.adapt.UnaryClient -import com.connectrpc.conformance.client.adapt.execute import com.connectrpc.http.HTTPClientInterface import com.connectrpc.impl.ProtocolClient import com.connectrpc.okhttp.ConnectOkHttpClient import com.connectrpc.protocols.GETConfiguration import com.google.protobuf.MessageLite import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import okhttp3.OkHttpClient @@ -153,7 +151,7 @@ class Client( private suspend fun handleClient( client: ClientStreamClient, req: ClientCompatRequest, - ): ClientResponseResult = coroutineScope { + ): ClientResponseResult { if (req.streamType != StreamType.CLIENT_STREAM) { throw RuntimeException("specified method ${req.method} is client-stream but stream type indicates ${req.streamType}") } @@ -163,7 +161,7 @@ class Client( ) { throw RuntimeException("client stream calls can only support `BeforeCloseSend` and 'AfterCloseSendMs' cancellation field, instead got ${req.cancel!!::class.simpleName}") } - client.execute(req.requestHeaders) { stream -> + return client.execute(req.requestHeaders) { stream -> var numUnsent = 0 for (i in req.requestMessages.indices) { if (req.requestDelayMs > 0) { diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt index 44442b35..5871510c 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt @@ -17,6 +17,8 @@ package com.connectrpc.conformance.client.adapt import com.connectrpc.BidirectionalStreamInterface import com.connectrpc.Headers import com.google.protobuf.MessageLite +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope /** * The client of a bidi-stream RPC operation. A bidi-stream @@ -35,7 +37,22 @@ abstract class BidiStreamClient( val reqTemplate: Req, val respTemplate: Resp, ) { - abstract suspend fun execute(headers: Headers): BidiStream + /** + * Executes the bidirectional-stream call inside the given block. + * The block is used to send requests and receive responses. The + * stream is automatically closed when the block returns or throws. + */ + suspend fun execute( + headers: Headers, + block: suspend CoroutineScope.(BidiStream) -> R, + ): R { + val stream = execute(headers) + return stream.use { + coroutineScope { block(this, it) } + } + } + + protected abstract suspend fun execute(headers: Headers): BidiStream /** * A BidiStream combines a request stream and a response stream. @@ -66,16 +83,3 @@ abstract class BidiStreamClient( } } } - -/** - * Executes the bidirectional-stream call inside the given block. - * The block is used to send requests and receive responses. The - * stream is automatically closed when the block returns or throws. - */ -suspend fun BidiStreamClient.execute( - headers: Headers, - block: suspend (BidiStreamClient.BidiStream) -> R, -): R { - val stream = execute(headers) - return stream.use(block) -} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt index aa875e21..f69b1d10 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -20,6 +20,8 @@ import com.connectrpc.ConnectException import com.connectrpc.Headers import com.connectrpc.ResponseMessage import com.google.protobuf.MessageLite +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope /** * The client of a client-stream RPC operation. A client-stream @@ -34,7 +36,22 @@ abstract class ClientStreamClient( val reqTemplate: Req, val respTemplate: Resp, ) { - abstract suspend fun execute(headers: Headers): ClientStream + /** + * Executes the client-stream call inside the given block. The block + * is used to send the requests and then retrieve the responses. The + * stream is automatically closed when the block returns or throws. + */ + suspend fun execute( + headers: Headers, + block: suspend CoroutineScope.(ClientStream) -> R, + ): R { + val stream = execute(headers) + return stream.use { + coroutineScope { block(this, it) } + } + } + + protected abstract suspend fun execute(headers: Headers): ClientStream /** * A ClientStream is just like a RequestStream, except that closing @@ -90,16 +107,3 @@ abstract class ClientStreamClient( } } } - -/** - * Executes the client-stream call inside the given block. The block - * is used to send the requests and then retrieve the responses. The - * stream is automatically closed when the block returns or throws. - */ -suspend fun ClientStreamClient.execute( - headers: Headers, - block: suspend (ClientStreamClient.ClientStream) -> R, -): R { - val stream = execute(headers) - return stream.use(block) -} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt index 73df2dce..1b472f7e 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt @@ -16,6 +16,8 @@ package com.connectrpc.conformance.client.adapt import com.connectrpc.Headers import com.google.protobuf.MessageLite +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope /** * The client of a server-stream RPC operation. A server-stream @@ -29,19 +31,21 @@ abstract class ServerStreamClient( val reqTemplate: Req, val respTemplate: Resp, ) { - abstract suspend fun execute(req: Req, headers: Headers): ResponseStream -} + /** + * Executes the server-stream call inside the given block. The block + * is used to consume the responses. The stream is automatically closed + * when the block returns or throws. + */ + suspend fun execute( + req: Req, + headers: Headers, + block: suspend CoroutineScope.(ResponseStream) -> R, + ): R { + val stream = execute(req, headers) + return stream.use { + coroutineScope { block(this, it) } + } + } -/** - * Executes the server-stream call inside the given block. The block - * is used to consume the responses. The stream is automatically closed - * when the block returns or throws. - */ -suspend fun ServerStreamClient.execute( - req: Req, - headers: Headers, - block: suspend (ResponseStream) -> R, -): R { - val stream = execute(req, headers) - return stream.use(block) + protected abstract suspend fun execute(req: Req, headers: Headers): ResponseStream } From 419d9ac14560273a4560741538d0335ba0a76bb6 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:13:10 -0500 Subject: [PATCH 3/3] review feedback: rename it SuspendCloseable --- .../connectrpc/conformance/client/adapt/BidiStreamClient.kt | 2 +- .../connectrpc/conformance/client/adapt/ClientStreamClient.kt | 2 +- .../com/connectrpc/conformance/client/adapt/RequestStream.kt | 2 +- .../com/connectrpc/conformance/client/adapt/ResponseStream.kt | 2 +- .../client/adapt/{Closeable.kt => SuspendCloseable.kt} | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) rename conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/{Closeable.kt => SuspendCloseable.kt} (92%) diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt index 5871510c..f8458080 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt @@ -60,7 +60,7 @@ abstract class BidiStreamClient( * @param Req The request message type * @param Resp The response message type */ - interface BidiStream : Closeable { + interface BidiStream : SuspendCloseable { val requests: RequestStream val responses: ResponseStream diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt index f69b1d10..81dd11b6 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -60,7 +60,7 @@ abstract class ClientStreamClient( * @param Req The request message type * @param Resp The response message type */ - interface ClientStream : Closeable { + interface ClientStream : SuspendCloseable { suspend fun send(req: Req) suspend fun closeAndReceive(): ResponseMessage diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt index 3a889f6b..a64b8f11 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt @@ -28,7 +28,7 @@ import com.google.protobuf.MessageLite * requests "half-closes" the stream; closing the responses * "fully closes" it. */ -interface RequestStream : Closeable { +interface RequestStream : SuspendCloseable { /** * Sends a message on the stream. * @throws Exception when the request cannot be sent diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt index 97fed9e7..659c7a78 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt @@ -29,7 +29,7 @@ import kotlinx.coroutines.channels.ReceiveChannel * * @param Resp The response message type */ -interface ResponseStream : Closeable { +interface ResponseStream : SuspendCloseable { val messages: ReceiveChannel suspend fun headers(): Headers suspend fun trailers(): Headers diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/SuspendCloseable.kt similarity index 92% rename from conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt rename to conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/SuspendCloseable.kt index a32ac63a..b5d364d8 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Closeable.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/SuspendCloseable.kt @@ -15,14 +15,14 @@ package com.connectrpc.conformance.client.adapt // Like java.io.Closeable, but the close operation is suspendable. -interface Closeable { +interface SuspendCloseable { suspend fun close() } // Like the standard kotlin "use" extension function, but uses // a suspending Closeable instead of java.io.Closeable and accepts // a suspending block. -internal suspend fun T.use(block: suspend (T) -> R): R { +internal suspend fun T.use(block: suspend (T) -> R): R { var exception: Throwable? = null try { return block(this)