Skip to content

Commit

Permalink
Simplify UnaryBlockingCall
Browse files Browse the repository at this point in the history
- It's now just an interface, and the implementation is
  in the impl package (just like the stream interfaces).
- The implementation is simpler and thread-safe.
- No longer supports calling it more than once. This was
  incorrectly implemented before since it became impossible
  to cancel earlier invocations (and was not thread-safe).
  Much simpler to just not allow this than to fix it.
  • Loading branch information
jhump committed Feb 22, 2024
1 parent 932a878 commit 6e86012
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 56 deletions.
49 changes: 5 additions & 44 deletions library/src/main/kotlin/com/connectrpc/UnaryBlockingCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,18 @@

package com.connectrpc

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

/**
* A [UnaryBlockingCall] contains the way to make a blocking RPC call and cancelling the RPC.
*/
class UnaryBlockingCall<Output> {
private var executable: ((ResponseMessage<Output>) -> Unit) -> Unit = { }
private var cancelFn: () -> Unit = { }

interface UnaryBlockingCall<Output> {
/**
* Execute the underlying request.
* Subsequent calls will create a new request.
* Execute the underlying request. Can only be called once.
* Subsequent calls will throw IllegalStateException.
*/
fun execute(): ResponseMessage<Output> {
val countDownLatch = CountDownLatch(1)
val reference = AtomicReference<ResponseMessage<Output>>()
executable { responseMessage ->
reference.set(responseMessage)
countDownLatch.countDown()
}
countDownLatch.await()
return reference.get()
}
fun execute(): ResponseMessage<Output>

/**
* Cancel the underlying request.
*/
fun cancel() {
cancelFn()
}

/**
* Gives the blocking call a cancellation function to cancel the
* underlying request.
*
* @param cancel The function to call in order to cancel the
* underlying request.
*/
internal fun setCancel(cancel: () -> Unit) {
this.cancelFn = cancel
}

/**
* Gives the blocking call the execution function to initiate
* the underlying request.
*
* @param executable The function to call in order to initiate
* a request.
*/
internal fun setExecute(executable: ((ResponseMessage<Output>) -> Unit) -> Unit) {
this.executable = executable
}
fun cancel()
}
14 changes: 2 additions & 12 deletions library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import java.net.URI
import java.util.concurrent.CountDownLatch
import kotlin.coroutines.resume

/**
Expand Down Expand Up @@ -170,18 +169,9 @@ class ProtocolClient(
headers: Headers,
methodSpec: MethodSpec<Input, Output>,
): UnaryBlockingCall<Output> {
val countDownLatch = CountDownLatch(1)
val call = UnaryBlockingCall<Output>()
// Set the unary synchronous executable.
call.setExecute { callback: (ResponseMessage<Output>) -> Unit ->
val cancellationFn = unary(request, headers, methodSpec) { responseMessage ->
callback(responseMessage)
countDownLatch.countDown()
}
// Set the cancellation function .
call.setCancel(cancellationFn)
return UnaryCall { callback ->
unary(request, headers, methodSpec, callback)
}
return call
}

override suspend fun <Input : Any, Output : Any> serverStream(
Expand Down
70 changes: 70 additions & 0 deletions library/src/main/kotlin/com/connectrpc/impl/UnaryCall.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022-2023 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.connectrpc.impl

import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.http.Cancelable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

/**
* Concrete implementation of [UnaryBlockingCall].
*/
class UnaryCall<Output>(
private val block: ((ResponseMessage<Output>) -> Unit) -> Cancelable,
) : UnaryBlockingCall<Output> {
private val executed = AtomicBoolean()

/**
* initialized to null and then replaced with non-null
* function when [execute] or [cancel] is called.
*/
private var cancelFunc = AtomicReference<Cancelable>()

/**
* Execute the underlying request.
*/
override fun execute(): ResponseMessage<Output> {
check(executed.compareAndSet(false, true)) { "already executed" }

val resultReady = CountDownLatch(1)
val result = AtomicReference<ResponseMessage<Output>>()
val cancelFn = block { responseMessage ->
result.set(responseMessage)
resultReady.countDown()
}

if (!cancelFunc.compareAndSet(null, cancelFn)) {
// concurrently cancelled before we could set the
// cancel function, so we need to cancel what we
// just started
cancelFn()
}
resultReady.await()
return result.get()
}

/**
* Cancel the underlying request.
*/
override fun cancel() {
val cancelFn = cancelFunc.getAndSet {} // set to (non-null) no-op
if (cancelFn != null) {
cancelFn()
}
}
}

0 comments on commit 6e86012

Please sign in to comment.