diff --git a/library/src/main/kotlin/com/connectrpc/UnaryBlockingCall.kt b/library/src/main/kotlin/com/connectrpc/UnaryBlockingCall.kt index b27c55ce..f69701d3 100644 --- a/library/src/main/kotlin/com/connectrpc/UnaryBlockingCall.kt +++ b/library/src/main/kotlin/com/connectrpc/UnaryBlockingCall.kt @@ -14,57 +14,18 @@ package com.connectrpc -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference - /** * A [UnaryBlockingCall] contains the way to make a blocking RPC call and cancelling the RPC. */ -class UnaryBlockingCall { - private var executable: ((ResponseMessage) -> Unit) -> Unit = { } - private var cancelFn: () -> Unit = { } - +interface UnaryBlockingCall { /** - * Execute the underlying request. - * Subsequent calls will create a new request. + * Execute the underlying request. Can only be called once. + * Subsequent calls will throw IllegalStateException. */ - fun execute(): ResponseMessage { - val countDownLatch = CountDownLatch(1) - val reference = AtomicReference>() - executable { responseMessage -> - reference.set(responseMessage) - countDownLatch.countDown() - } - countDownLatch.await() - return reference.get() - } + fun execute(): ResponseMessage /** * Cancel the underlying request. */ - fun cancel() { - cancelFn() - } - - /** - * Gives the blocking call a cancellation function to cancel the - * underlying request. - * - * @param cancel The function to call in order to cancel the - * underlying request. - */ - internal fun setCancel(cancel: () -> Unit) { - this.cancelFn = cancel - } - - /** - * Gives the blocking call the execution function to initiate - * the underlying request. - * - * @param executable The function to call in order to initiate - * a request. - */ - internal fun setExecute(executable: ((ResponseMessage) -> Unit) -> Unit) { - this.executable = executable - } + fun cancel() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index 909b5be8..6a00be13 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -40,7 +40,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import java.net.URI -import java.util.concurrent.CountDownLatch import kotlin.coroutines.resume /** @@ -170,18 +169,9 @@ class ProtocolClient( headers: Headers, methodSpec: MethodSpec, ): UnaryBlockingCall { - val countDownLatch = CountDownLatch(1) - val call = UnaryBlockingCall() - // Set the unary synchronous executable. - call.setExecute { callback: (ResponseMessage) -> Unit -> - val cancellationFn = unary(request, headers, methodSpec) { responseMessage -> - callback(responseMessage) - countDownLatch.countDown() - } - // Set the cancellation function . - call.setCancel(cancellationFn) + return UnaryCall { callback -> + unary(request, headers, methodSpec, callback) } - return call } override suspend fun serverStream( diff --git a/library/src/main/kotlin/com/connectrpc/impl/UnaryCall.kt b/library/src/main/kotlin/com/connectrpc/impl/UnaryCall.kt new file mode 100644 index 00000000..32f21dc0 --- /dev/null +++ b/library/src/main/kotlin/com/connectrpc/impl/UnaryCall.kt @@ -0,0 +1,70 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.impl + +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.http.Cancelable +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +/** + * Concrete implementation of [UnaryBlockingCall]. + */ +class UnaryCall( + private val block: ((ResponseMessage) -> Unit) -> Cancelable, +) : UnaryBlockingCall { + private val executed = AtomicBoolean() + + /** + * initialized to null and then replaced with non-null + * function when [execute] or [cancel] is called. + */ + private var cancelFunc = AtomicReference() + + /** + * Execute the underlying request. + */ + override fun execute(): ResponseMessage { + check(executed.compareAndSet(false, true)) { "already executed" } + + val resultReady = CountDownLatch(1) + val result = AtomicReference>() + val cancelFn = block { responseMessage -> + result.set(responseMessage) + resultReady.countDown() + } + + if (!cancelFunc.compareAndSet(null, cancelFn)) { + // concurrently cancelled before we could set the + // cancel function, so we need to cancel what we + // just started + cancelFn() + } + resultReady.await() + return result.get() + } + + /** + * Cancel the underlying request. + */ + override fun cancel() { + val cancelFn = cancelFunc.getAndSet {} // set to (non-null) no-op + if (cancelFn != null) { + cancelFn() + } + } +}