Skip to content

Commit

Permalink
Add blocking unary generation (#50)
Browse files Browse the repository at this point in the history
Providing options for generating synchronous methods for unary RPC
calls:
```kotlin
        val response = testServiceConnectClient.unaryCallBlocking(message)
        assertThat(response.code).isEqualTo(Code.UNKNOWN)
        response.failure { errorResponse ->
            assertThat(errorResponse.error).isNotNull()
            assertThat(errorResponse.code).isEqualTo(Code.UNKNOWN)
            assertThat(errorResponse.error.message).isEqualTo("test status message")
        }
        response.success {
            fail<Unit>("unexpected success")
        }
```
  • Loading branch information
buildbreaker authored Aug 11, 2023
1 parent a16ff78 commit 2e45257
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 6 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ is available on the [connect.build website][getting-started].

## Generation Options

| **Option** | **Type** | **Default** | **Repeatable** | **Details** |
|----------------------------|:--------:|:-----------:|:--------------:|-------------------------------------------------|
| `generateCallbackMethods` | Boolean | `false` | No | Generate callback signatures for unary methods. |
| `generateCoroutineMethods` | Boolean | `true` | No | Generate suspend signatures for unary methods. |
| **Option** | **Type** | **Default** | **Details** |
|--------------------------------|:--------:|:-----------:|-------------------------------------------------|
| `generateCallbackMethods` | Boolean | `false` | Generate callback signatures for unary methods. |
| `generateCoroutineMethods` | Boolean | `true` | Generate suspend signatures for unary methods. |
| `generateBlockingUnaryMethods` | Boolean | `false` | Generate blocking signatures for unary methods. |

## Example Apps

Expand Down
1 change: 1 addition & 0 deletions crosstests/buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ plugins:
opt:
- generateCallbackMethods=true
- generateCoroutineMethods=true
- generateBlockingUnaryMethods=true
- name: java
out: google-java/src/main/java/generated
- name: kotlin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CrossTest(
private lateinit var shortTimeoutConnectClient: ProtocolClient
private lateinit var unimplementedServiceClient: UnimplementedServiceClient
private lateinit var testServiceConnectClient: TestServiceClient

companion object {
@JvmStatic
@Parameters(name = "protocol")
Expand Down Expand Up @@ -376,6 +377,138 @@ class CrossTest(
assertThat(countDownLatch.count).isZero()
}

@Test
fun emptyUnaryBlocking(): Unit = runBlocking {
val response = testServiceConnectClient.emptyCallBlocking(empty {}).execute()
response.failure {
fail<Unit>("expected error to be null")
}
response.success { success ->
assertThat(success.message).isEqualTo(empty {})
}
}

@Test
fun largeUnaryBlocking(): Unit = runBlocking {
val size = 314159
val message = simpleRequest {
responseSize = size
payload = payload {
body = ByteString.copyFrom(ByteArray(size))
}
}
val response = testServiceConnectClient.unaryCallBlocking(message).execute()
response.failure {
fail<Unit>("expected error to be null")
}
response.success { success ->
assertThat(success.message.payload?.body?.toByteArray()?.size).isEqualTo(size)
}
}

@Test
fun customMetadataBlocking(): Unit = runBlocking {
val size = 314159
val leadingKey = "x-grpc-test-echo-initial"
val leadingValue = "test_initial_metadata_value"
val trailingKey = "x-grpc-test-echo-trailing-bin"
val trailingValue = byteArrayOf(0xab.toByte(), 0xab.toByte(), 0xab.toByte())
val headers =
mapOf(
leadingKey to listOf(leadingValue),
trailingKey to listOf(b64Encode(trailingValue))
)
val message = simpleRequest {
responseSize = size
payload = payload { body = ByteString.copyFrom(ByteArray(size)) }
}
val response = testServiceConnectClient.unaryCallBlocking(message, headers).execute()
assertThat(response.code).isEqualTo(Code.OK)
assertThat(response.headers[leadingKey]).containsExactly(leadingValue)
assertThat(response.trailers[trailingKey]).containsExactly(b64Encode(trailingValue))
response.failure {
fail<Unit>("expected error to be null")
}
response.success { success ->
assertThat(success.message.payload!!.body!!.size()).isEqualTo(size)
}
}

@Test
fun statusCodeAndMessageBlocking(): Unit = runBlocking {
val message = simpleRequest {
responseStatus = echoStatus {
code = Code.UNKNOWN.value
message = "test status message"
}
}
val response = testServiceConnectClient.unaryCallBlocking(message).execute()
assertThat(response.code).isEqualTo(Code.UNKNOWN)
response.failure { errorResponse ->
assertThat(errorResponse.error).isNotNull()
assertThat(errorResponse.code).isEqualTo(Code.UNKNOWN)
assertThat(errorResponse.error.message).isEqualTo("test status message")
}
response.success {
fail<Unit>("unexpected success")
}
}

@Test
fun specialStatusBlocking(): Unit = runBlocking {
val statusMessage =
"\\t\\ntest with whitespace\\r\\nand Unicode BMP ☺ and non-BMP \uD83D\uDE08\\t\\n"
val response = testServiceConnectClient.unaryCallBlocking(
simpleRequest {
responseStatus = echoStatus {
code = 2
message = statusMessage
}
}
).execute()
response.failure { errorResponse ->
val error = errorResponse.error
assertThat(error.code).isEqualTo(Code.UNKNOWN)
assertThat(response.code).isEqualTo(Code.UNKNOWN)
assertThat(error.message).isEqualTo(statusMessage)
}
response.success {
fail<Unit>("unexpected success")
}
}

@Test
fun unimplementedMethodBlocking(): Unit = runBlocking {
val response = testServiceConnectClient.unimplementedCallBlocking(empty {}).execute()
assertThat(response.code).isEqualTo(Code.UNIMPLEMENTED)
}

@Test
fun unimplementedServiceBlocking(): Unit = runBlocking {
val response = unimplementedServiceClient.unimplementedCallBlocking(empty {}).execute()
assertThat(response.code).isEqualTo(Code.UNIMPLEMENTED)
}

@Test
fun failUnaryBlocking(): Unit = runBlocking {
val expectedErrorDetail = errorDetail {
reason = "soirée 🎉"
domain = "connect-crosstest"
}
val response = testServiceConnectClient.failUnaryCallBlocking(simpleRequest {}).execute()
assertThat(response.code).isEqualTo(Code.RESOURCE_EXHAUSTED)
response.failure { errorResponse ->
val error = errorResponse.error
assertThat(error.code).isEqualTo(Code.RESOURCE_EXHAUSTED)
assertThat(error.message).isEqualTo("soirée 🎉")
val connectErrorDetails = error.unpackedDetails(ErrorDetail::class)
assertThat(connectErrorDetails).containsExactly(expectedErrorDetail)
}
response.success {
fail<Unit>("unexpected success")
}
}

private fun b64Encode(trailingValue: ByteArray): String {
return String(Base64.getEncoder().encode(trailingValue))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ interface ProtocolClientInterface {
methodSpec: MethodSpec<Input, Output>
): ResponseMessage<Output>

/**
* Perform a synchronous unary (non-streaming) request.
*
* @param request The outbound request message.
* @param headers The outbound request headers to include.
* @param methodSpec The Method for RPC path.
*
* @return The [UnaryBlockingCall] for the unary request.
*/
fun <Input : Any, Output : Any> unaryBlocking(
request: Input,
headers: Headers,
methodSpec: MethodSpec<Input, Output>
): UnaryBlockingCall<Output>

/**
* Start a new bidirectional stream.
*
Expand Down
70 changes: 70 additions & 0 deletions library/src/main/kotlin/build/buf/connect/UnaryBlockingCall.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package build.buf.connect

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

/**
* A [UnaryBlockingCall] contains the way to make a blocking RPC call and cancelling the RPC.
*/
class UnaryBlockingCall<Output> {
private var executable: ((ResponseMessage<Output>) -> Unit) -> Unit = { }
private var cancel: () -> Unit = { }

/**
* Execute the underlying request.
* Subsequent calls will create a new request.
*/
fun execute(): ResponseMessage<Output> {
val countDownLatch = CountDownLatch(1)
val reference = AtomicReference<ResponseMessage<Output>>()
executable { responseMessage ->
reference.set(responseMessage)
countDownLatch.countDown()
}
countDownLatch.await()
return reference.get()
}

/**
* Cancel the underlying request.
*/
fun cancel() {
cancel()
}

/**
* Gives the blocking call a cancellation function to cancel the
* underlying request.
*
* @param cancel The function to call in order to cancel the
* underlying request.
*/
internal fun setCancel(cancel: () -> Unit) {
this.cancel = cancel
}

/**
* Gives the blocking call the execution function to initiate
* the underlying request.
*
* @param executable The function to call in order to initiate
* a request.
*/
internal fun setExecute(executable: ((ResponseMessage<Output>) -> Unit) -> Unit) {
this.executable = executable
}
}
21 changes: 21 additions & 0 deletions library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import build.buf.connect.ProtocolClientInterface
import build.buf.connect.ResponseMessage
import build.buf.connect.ServerOnlyStreamInterface
import build.buf.connect.StreamResult
import build.buf.connect.UnaryBlockingCall
import build.buf.connect.http.Cancelable
import build.buf.connect.http.HTTPClientInterface
import build.buf.connect.http.HTTPRequest
Expand All @@ -32,6 +33,7 @@ import build.buf.connect.protocols.GETConfiguration
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.suspendCancellableCoroutine
import java.net.URL
import java.util.concurrent.CountDownLatch
import kotlin.coroutines.resume

/**
Expand Down Expand Up @@ -117,6 +119,25 @@ class ProtocolClient(
}
}

override fun <Input : Any, Output : Any> unaryBlocking(
request: Input,
headers: Headers,
methodSpec: MethodSpec<Input, Output>
): UnaryBlockingCall<Output> {
val countDownLatch = CountDownLatch(1)
val call = UnaryBlockingCall<Output>()
// Set the unary synchronous executable.
call.setExecute { callback: (ResponseMessage<Output>) -> Unit ->
val cancellationFn = unary(request, headers, methodSpec) { responseMessage ->
callback(responseMessage)
countDownLatch.countDown()
}
// Set the cancellation function .
call.setCancel(cancellationFn)
}
return call
}

override suspend fun <Input : Any, Output : Any> stream(
headers: Headers,
methodSpec: MethodSpec<Input, Output>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import build.buf.connect.MethodSpec
import build.buf.connect.ProtocolClientInterface
import build.buf.connect.ResponseMessage
import build.buf.connect.ServerOnlyStreamInterface
import build.buf.connect.UnaryBlockingCall
import build.buf.protocgen.connect.internal.CodeGenerator
import build.buf.protocgen.connect.internal.Configuration
import build.buf.protocgen.connect.internal.Plugin
Expand Down Expand Up @@ -221,6 +222,16 @@ class Generator : CodeGenerator {
.build()
functions.add(unaryCallbackFunction)
}
if (configuration.generateBlockingUnaryMethods) {
val unarySuspendFunction = FunSpec.builder("${method.name.lowerCamelCase()}Blocking")
.addKdoc(sourceInfo.comment().sanitizeKdoc())
.addModifiers(KModifier.ABSTRACT)
.addParameter("request", inputClassName)
.addParameter(headerParameterSpec)
.returns(UnaryBlockingCall::class.asClassName().parameterizedBy(outputClassName))
.build()
functions.add(unarySuspendFunction)
}
}
}
return functions
Expand Down Expand Up @@ -404,6 +415,28 @@ class Generator : CodeGenerator {
.build()
functions.add(unaryCallbackFunction)
}
if (configuration.generateBlockingUnaryMethods) {
val unarySuspendFunction = FunSpec.builder("${method.name.lowerCamelCase()}Blocking")
.addKdoc(sourceInfo.comment().sanitizeKdoc())
.addModifiers(KModifier.OVERRIDE)
.addParameter("request", inputClassName)
.addParameter("headers", HEADERS_CLASS_NAME)
.returns(UnaryBlockingCall::class.asClassName().parameterizedBy(outputClassName))
.addStatement(
"return %L",
CodeBlock.builder()
.addStatement("client.unaryBlocking(")
.indent()
.addStatement("request,")
.addStatement("headers,")
.add(methodSpecCallBlock)
.unindent()
.addStatement(")")
.build()
)
.build()
functions.add(unarySuspendFunction)
}
}
}
return functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package build.buf.protocgen.connect.internal

internal const val CALLBACK_SIGNATURE = "generateCallbackMethods"
internal const val COROUTINE_SIGNATURE = "generateCoroutineMethods"
internal const val BLOCKING_UNARY_SIGNATURE = "generateBlockingUnaryMethods"

/**
* The protoc plugin configuration class representation.
Expand All @@ -24,7 +25,9 @@ internal data class Configuration(
// Enable or disable callback signature generation.
val generateCallbackMethods: Boolean,
// Enable or disable coroutine signature generation.
val generateCoroutineMethods: Boolean
val generateCoroutineMethods: Boolean,
// Enable or disable blocking unary signature generation.
val generateBlockingUnaryMethods: Boolean
)

/**
Expand All @@ -39,6 +42,7 @@ internal fun parse(input: String): Configuration {
val parameters = parseGeneratorParameter(input)
return Configuration(
generateCallbackMethods = parameters[CALLBACK_SIGNATURE]?.toBoolean() ?: false,
generateCoroutineMethods = parameters[COROUTINE_SIGNATURE]?.toBoolean() ?: true // Defaulted to true.
generateCoroutineMethods = parameters[COROUTINE_SIGNATURE]?.toBoolean() ?: true, // Defaulted to true.
generateBlockingUnaryMethods = parameters[BLOCKING_UNARY_SIGNATURE]?.toBoolean() ?: false
)
}

0 comments on commit 2e45257

Please sign in to comment.