diff --git a/Makefile b/Makefile index be856766..cf81496f 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ BIN := .tmp/bin CACHE := .tmp/cache LICENSE_HEADER_YEAR_RANGE := 2022-2023 LICENSE_HEADER_VERSION := v1.28.1 -CONFORMANCE_VERSION := v1.0.0-rc1 +CONFORMANCE_VERSION := v1.0.0-rc2 PROTOC_VERSION ?= 25.1 GRADLE_ARGS ?= @@ -75,8 +75,7 @@ generate: $(PROTOC) buildplugin generateconformance generateexamples ## Generate .PHONY: generateconformance generateconformance: $(PROTOC) buildplugin ## Generate protofiles for conformance tests. buf generate --template conformance/buf.gen.yaml -o conformance conformance/proto - buf generate --template conformance/client/buf.gen.yaml -o conformance/client buf.build/connectrpc/conformance:$(CONFORMANCE_VERSION) - buf generate --template conformance/client/buf.gen.lite.yaml -o conformance/client buf.build/connectrpc/conformance:$(CONFORMANCE_VERSION) + buf generate --template conformance/buf.gen.yaml -o conformance/client buf.build/connectrpc/conformance:$(CONFORMANCE_VERSION) .PHONY: generateexamples generateexamples: $(PROTOC) buildplugin ## Generate proto files for example apps. diff --git a/conformance/client/buf.gen.lite.yaml b/conformance/client/buf.gen.lite.yaml deleted file mode 100644 index 21df319f..00000000 --- a/conformance/client/buf.gen.lite.yaml +++ /dev/null @@ -1,20 +0,0 @@ -version: v1 -managed: - enabled: true - java_package_prefix: com.connectrpc.lite -plugins: - - plugin: connect-kotlin - out: build/generated/sources/bufgen - path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin - opt: - - generateCallbackMethods=true - - generateCoroutineMethods=true - - generateBlockingUnaryMethods=true - - plugin: java - out: build/generated/sources/bufgen - protoc_path: .tmp/bin/protoc - opt: lite - - plugin: kotlin - out: build/generated/sources/bufgen - protoc_path: .tmp/bin/protoc - opt: lite diff --git a/conformance/client/buf.gen.yaml b/conformance/client/buf.gen.yaml deleted file mode 100644 index 51f0f04e..00000000 --- a/conformance/client/buf.gen.yaml +++ /dev/null @@ -1,17 +0,0 @@ -version: v1 -managed: - enabled: true -plugins: - - plugin: connect-kotlin - out: google-java/build/generated/sources/bufgen - path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin - opt: - - generateCallbackMethods=true - - generateCoroutineMethods=true - - generateBlockingUnaryMethods=true - - plugin: java - out: google-java/build/generated/sources/bufgen - protoc_path: .tmp/bin/protoc - - plugin: kotlin - out: google-java/build/generated/sources/bufgen - protoc_path: .tmp/bin/protoc diff --git a/conformance/client/build.gradle.kts b/conformance/client/build.gradle.kts index 7f517ad9..266f94e5 100644 --- a/conformance/client/build.gradle.kts +++ b/conformance/client/build.gradle.kts @@ -2,37 +2,9 @@ plugins { kotlin("jvm") } -// This base project contains generated code for the lite runtime -// and depends on the Google Protobuf Java Lite runtime. -// The main client logic is implemented in terms of generated -// code for that lite runtime. -// -// The non-lite runtime excludes the Google Protobuf Java Lite -// runtime and instead uses the full Java runtime. It then can -// adapt from the lite-runtime-generated code by serializing to -// bytes and then de-serializing into non-lite-generated types. - -sourceSets { - main { - java { - srcDir("build/generated/sources/bufgen") - } - } -} - -tasks { - compileKotlin { - kotlinOptions { - // Generated Kotlin code for protobufs uses RequiresOptIn annotation - freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn" - } - } -} - dependencies { implementation(project(":okhttp")) implementation(libs.kotlin.coroutines.core) - implementation(libs.protobuf.kotlinlite) implementation(libs.protobuf.javalite) implementation(libs.okio.core) implementation(libs.okhttp.tls) diff --git a/conformance/client/google-java/build.gradle.kts b/conformance/client/google-java/build.gradle.kts index 98b17ae7..8228be50 100644 --- a/conformance/client/google-java/build.gradle.kts +++ b/conformance/client/google-java/build.gradle.kts @@ -16,12 +16,12 @@ plugins { tasks { compileKotlin { kotlinOptions { - // Generated Kotlin code for protobufs uses OptIn annotation + // Generated Kotlin code for protobuf uses OptIn annotation freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn" } } shadowJar { - archiveBaseName.set("shadow") + archiveFileName.set("conformance-client-java.jar") manifest { attributes(mapOf("Main-Class" to "com.connectrpc.conformance.client.java.MainKt")) } @@ -31,8 +31,6 @@ tasks { } } -// This project contains an alternate copy of the generated -// types, generated for the non-lite runtime. sourceSets { main { java { @@ -43,8 +41,12 @@ sourceSets { dependencies { implementation(project(":conformance:client")) { + // Shared module depends on javalite, just for some core + // classes that are shared across both java and javalite + // runtimes, like ByteString and MessageLite. We must + // exclude it here to avoid any classpath ambiguity since + // we pull in the full runtime for this module. exclude(group = "com.google.protobuf", module = "protobuf-javalite") - exclude(group = "com.google.protobuf", module = "protobuf-kotlinlite") } implementation(project(":extensions:google-java")) implementation(project(":okhttp")) diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt new file mode 100644 index 00000000..448a5511 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt @@ -0,0 +1,234 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.ConnectException +import com.connectrpc.Headers +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.AnyMessage +import com.connectrpc.conformance.client.adapt.ClientCompatRequest +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.HttpVersion +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.TlsCreds +import com.connectrpc.conformance.client.adapt.ClientCompatResponse +import com.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.conformance.v1.ClientCompatRequest.Cancel.CancelTimingCase +import com.connectrpc.conformance.v1.ClientErrorResult +import com.connectrpc.conformance.v1.ClientResponseResult +import com.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.conformance.v1.Codec +import com.connectrpc.conformance.v1.Compression +import com.connectrpc.conformance.v1.ConformancePayload +import com.connectrpc.conformance.v1.Error +import com.connectrpc.conformance.v1.HTTPVersion +import com.connectrpc.conformance.v1.Header +import com.connectrpc.conformance.v1.IdempotentUnaryResponse +import com.connectrpc.conformance.v1.Protocol +import com.connectrpc.conformance.v1.ServerStreamResponse +import com.connectrpc.conformance.v1.StreamType +import com.connectrpc.conformance.v1.UnaryResponse +import com.connectrpc.conformance.v1.UnimplementedResponse +import com.connectrpc.extensions.GoogleJavaJSONStrategy +import com.connectrpc.extensions.GoogleJavaProtobufStrategy +import com.connectrpc.protocols.NetworkProtocol +import com.google.protobuf.Any +import com.google.protobuf.ByteString +import com.google.protobuf.MessageLite + +class JavaHelpers { + companion object { + private const val TYPE_URL_PREFIX = "type.googleapis.com/" + + fun serializationStrategy(codec: ClientCompatRequest.Codec): SerializationStrategy { + return when (codec) { + ClientCompatRequest.Codec.PROTO -> GoogleJavaProtobufStrategy() + ClientCompatRequest.Codec.JSON -> GoogleJavaJSONStrategy() + else -> throw RuntimeException("unsupported codec $codec") + } + } + + fun unmarshalRequest(bytes: ByteArray): ClientCompatRequest { + val msg = com.connectrpc.conformance.v1.ClientCompatRequest.parseFrom(bytes) + return ClientCompatRequestImpl(msg) + } + + fun marshalResponse(resp: ClientCompatResponse): ByteArray { + val builder = com.connectrpc.conformance.v1.ClientCompatResponse + .newBuilder() + .setTestName(resp.testName) + when (val result = resp.result) { + is ClientCompatResponse.Result.ResponseResult -> { + val respBuilder = ClientResponseResult.newBuilder() + .addAllResponseHeaders(toProtoHeaders(result.response.headers)) + .addAllPayloads(toProtoPayloads(result.response.payloads)) + .addAllResponseTrailers(toProtoHeaders(result.response.trailers)) + .setNumUnsentRequests(result.response.numUnsentRequests) + val err = result.response.error + if (err != null) { + respBuilder.setError(toProtoError(err)) + } + builder.setResponse(respBuilder) + } + is ClientCompatResponse.Result.ErrorResult -> { + builder.setError( + ClientErrorResult.newBuilder() + .setMessage(result.error), + ) + } + } + return builder.build().toByteArray() + } + + fun extractPayload(response: MessageLite): MessageLite { + return when (response) { + is UnaryResponse -> response.payload + is IdempotentUnaryResponse -> response.payload + is UnimplementedResponse -> ConformancePayload.getDefaultInstance() + is ClientStreamResponse -> response.payload + is ServerStreamResponse -> response.payload + is BidiStreamResponse -> response.payload + else -> throw RuntimeException("don't know how to extract payload from ${response::class.qualifiedName}") + } + } + + private fun fromProtoHeaders(headers: List
): Headers { + return headers.groupingBy(Header::getName).aggregate { _: String, accumulator: List?, element: Header, _: Boolean -> + accumulator?.plus(element.valueList) ?: element.valueList + } + } + + private fun toProtoHeaders(headers: Headers): List
{ + return headers.map { + Header.newBuilder() + .setName(it.key) + .addAllValue(it.value) + .build() + } + } + + private fun toProtoPayloads(payloads: List): List { + return payloads.map { + if (it is ConformancePayload) { + it + } else { + ConformancePayload.parseFrom(it.toByteArray()) + } + } + } + + private fun toProtoError(ex: ConnectException): Error { + return Error.newBuilder() + .setCode(ex.code.value) + .setMessage(ex.message ?: ex.code.codeName) + .addAllDetails( + ex.details.map { + Any.newBuilder() + .setTypeUrl(toTypeUrl(it.type)) + .setValue(ByteString.copyFrom(it.payload.toByteArray())) + .build() + }, + ) + .build() + } + + private fun toTypeUrl(typeName: String): String { + return if (typeName.contains('/')) typeName else TYPE_URL_PREFIX + typeName + } + } + + private class ClientCompatRequestImpl( + private val msg: com.connectrpc.conformance.v1.ClientCompatRequest, + ) : ClientCompatRequest { + override val testName: String + get() = msg.testName + override val service: String + get() = msg.service + override val method: String + get() = msg.method + override val host: String + get() = msg.host + override val port: Int + get() = msg.port + override val serverTlsCert: ByteString + get() = msg.serverTlsCert + override val clientTlsCreds: TlsCreds? + get() = if (msg.hasClientTlsCreds()) TlsCredsImpl(msg.clientTlsCreds) else null + override val timeoutMs: Int + get() = msg.timeoutMs + override val requestDelayMs: Int + get() = msg.requestDelayMs + override val useGetHttpMethod: Boolean + get() = msg.useGetHttpMethod + override val httpVersion: HttpVersion + get() = when (msg.httpVersion) { + HTTPVersion.HTTP_VERSION_1 -> HttpVersion.HTTP_1_1 + HTTPVersion.HTTP_VERSION_2 -> HttpVersion.HTTP_2 + else -> throw RuntimeException("unsupported HTTP version: ${msg.httpVersion}") + } + override val protocol: NetworkProtocol + get() = when (msg.protocol) { + Protocol.PROTOCOL_CONNECT -> NetworkProtocol.CONNECT + Protocol.PROTOCOL_GRPC -> NetworkProtocol.GRPC + Protocol.PROTOCOL_GRPC_WEB -> NetworkProtocol.GRPC_WEB + else -> throw RuntimeException("unsupported protocol: ${msg.protocol}") + } + override val codec: ClientCompatRequest.Codec + get() = when (msg.codec) { + Codec.CODEC_PROTO -> ClientCompatRequest.Codec.PROTO + Codec.CODEC_JSON -> ClientCompatRequest.Codec.JSON + else -> throw RuntimeException("unsupported codec: ${msg.codec}") + } + override val compression: ClientCompatRequest.Compression + get() = when (msg.compression) { + Compression.COMPRESSION_IDENTITY, Compression.COMPRESSION_UNSPECIFIED -> ClientCompatRequest.Compression.IDENTITY + Compression.COMPRESSION_GZIP -> ClientCompatRequest.Compression.GZIP + else -> throw RuntimeException("unsupported compression: ${msg.compression}") + } + override val streamType: ClientCompatRequest.StreamType + get() = when (msg.streamType) { + StreamType.STREAM_TYPE_UNARY -> ClientCompatRequest.StreamType.UNARY + StreamType.STREAM_TYPE_CLIENT_STREAM -> ClientCompatRequest.StreamType.CLIENT_STREAM + StreamType.STREAM_TYPE_SERVER_STREAM -> ClientCompatRequest.StreamType.SERVER_STREAM + StreamType.STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM -> ClientCompatRequest.StreamType.HALF_DUPLEX_BIDI_STREAM + StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM -> ClientCompatRequest.StreamType.FULL_DUPLEX_BIDI_STREAM + else -> throw RuntimeException("unsupported stream type: ${msg.streamType}") + } + override val requestHeaders: Headers + get() = fromProtoHeaders(msg.requestHeadersList) + override val requestMessages: List + get() = msg.requestMessagesList.map { + AnyMessage(it.typeUrl, it.value) + } + override val cancel: ClientCompatRequest.Cancel? + get() = when (msg.cancel.cancelTimingCase) { + CancelTimingCase.CANCELTIMING_NOT_SET, null -> + null + CancelTimingCase.BEFORE_CLOSE_SEND -> + ClientCompatRequest.Cancel.BeforeCloseSend() + CancelTimingCase.AFTER_CLOSE_SEND_MS -> + ClientCompatRequest.Cancel.AfterCloseSendMs(msg.cancel.afterCloseSendMs) + CancelTimingCase.AFTER_NUM_RESPONSES -> + ClientCompatRequest.Cancel.AfterNumResponses(msg.cancel.afterNumResponses) + } + } + + private class TlsCredsImpl( + private val msg: com.connectrpc.conformance.v1.ClientCompatRequest.TLSCreds, + ) : TlsCreds { + override val cert: ByteString + get() = msg.cert + override val key: ByteString + get() = msg.key + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaIdempotentUnaryClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaIdempotentUnaryClient.kt new file mode 100644 index 00000000..04458c65 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaIdempotentUnaryClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.IdempotentUnaryRequest +import com.connectrpc.conformance.v1.IdempotentUnaryResponse +import com.connectrpc.http.Cancelable + +class JavaIdempotentUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + IdempotentUnaryRequest.getDefaultInstance(), + IdempotentUnaryResponse.getDefaultInstance(), +) { + override suspend fun execute(req: IdempotentUnaryRequest, headers: Headers): ResponseMessage { + return client.idempotentUnary(req, headers) + } + + override fun execute( + req: IdempotentUnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.idempotentUnary(req, headers, onFinish) + } + + override fun blocking(req: IdempotentUnaryRequest, headers: Headers): UnaryBlockingCall { + return client.idempotentUnaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt index 297b4c31..1382602c 100644 --- a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt @@ -14,26 +14,27 @@ package com.connectrpc.conformance.client.java -import com.connectrpc.SerializationStrategy import com.connectrpc.conformance.client.adapt.BidiStreamClient import com.connectrpc.conformance.client.adapt.ClientStreamClient import com.connectrpc.conformance.client.adapt.Invoker import com.connectrpc.conformance.client.adapt.ServerStreamClient import com.connectrpc.conformance.client.adapt.UnaryClient import com.connectrpc.conformance.v1.ConformanceServiceClient -import com.connectrpc.extensions.GoogleJavaJSONStrategy -import com.connectrpc.extensions.GoogleJavaProtobufStrategy import com.connectrpc.impl.ProtocolClient -import com.connectrpc.lite.connectrpc.conformance.v1.Codec class JavaInvoker( protocolClient: ProtocolClient, ) : Invoker { private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { return JavaUnaryClient(client) } + override fun idempotentUnaryClient(): UnaryClient<*, *> { + return JavaIdempotentUnaryClient(client) + } + override fun unimplementedClient(): UnaryClient<*, *> { return JavaUnimplementedClient(client) } @@ -49,14 +50,4 @@ class JavaInvoker( override fun bidiStreamClient(): BidiStreamClient<*, *> { return JavaBidiStreamClient(client) } - - companion object { - fun serializationStrategy(codec: Codec): SerializationStrategy { - return when (codec) { - Codec.CODEC_PROTO -> GoogleJavaProtobufStrategy() - Codec.CODEC_JSON -> GoogleJavaJSONStrategy() - else -> throw RuntimeException("unsupported codec $codec") - } - } - } } diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt index fec58521..cbdf4e74 100644 --- a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt @@ -18,16 +18,17 @@ import com.connectrpc.conformance.client.Client import com.connectrpc.conformance.client.ConformanceClientLoop fun main(args: Array) { - try { - val invokeStyle = ConformanceClientLoop.parseArgs(args) - val client = Client( - invokerFactory = ::JavaInvoker, - serializationFactory = JavaInvoker::serializationStrategy, - invokeStyle = invokeStyle, - ) - ConformanceClientLoop.run(System.`in`, System.out, client) - } catch (e: Exception) { - e.printStackTrace(System.err) - Runtime.getRuntime().exit(1) - } + val invokeStyle = ConformanceClientLoop.parseArgs(args) + val loop = ConformanceClientLoop( + JavaHelpers::unmarshalRequest, + JavaHelpers::marshalResponse, + ) + val client = Client( + invokerFactory = ::JavaInvoker, + serializationFactory = JavaHelpers::serializationStrategy, + invokeStyle = invokeStyle, + payloadExtractor = JavaHelpers::extractPayload, + ) + loop.run(System.`in`, System.out, client) + // TODO: catch any exception for better error output/logging? } diff --git a/conformance/client/google-javalite/build.gradle.kts b/conformance/client/google-javalite/build.gradle.kts index 6fda62a7..75cdb82e 100644 --- a/conformance/client/google-javalite/build.gradle.kts +++ b/conformance/client/google-javalite/build.gradle.kts @@ -14,8 +14,14 @@ plugins { } tasks { + compileKotlin { + kotlinOptions { + // Generated Kotlin code for protobuf uses RequiresOptIn annotation + freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn" + } + } shadowJar { - archiveBaseName.set("shadow") + archiveFileName.set("conformance-client-javalite.jar") manifest { attributes(mapOf("Main-Class" to "com.connectrpc.conformance.client.javalite.MainKt")) } @@ -25,12 +31,21 @@ tasks { } } +sourceSets { + main { + java { + srcDir("build/generated/sources/bufgen") + } + } +} + dependencies { implementation(project(":conformance:client")) implementation(project(":extensions:google-javalite")) implementation(project(":okhttp")) implementation(libs.kotlin.coroutines.core) implementation(libs.protobuf.kotlinlite) + implementation(libs.protobuf.javalite) implementation(libs.okio.core) implementation(libs.okhttp.tls) } diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt index 9cf9a6bf..24505af9 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt @@ -16,9 +16,9 @@ package com.connectrpc.conformance.client.javalite import com.connectrpc.Headers import com.connectrpc.conformance.client.adapt.BidiStreamClient -import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamRequest -import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamResponse -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.BidiStreamRequest +import com.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient class JavaLiteBidiStreamClient( private val client: ConformanceServiceClient, diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt index 61c8d048..ff641710 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt @@ -16,9 +16,9 @@ package com.connectrpc.conformance.client.javalite import com.connectrpc.Headers import com.connectrpc.conformance.client.adapt.ClientStreamClient -import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamRequest -import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamResponse -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.ClientStreamRequest +import com.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient class JavaLiteClientStreamClient( private val client: ConformanceServiceClient, diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt new file mode 100644 index 00000000..c1a0f8e5 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt @@ -0,0 +1,232 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.ConnectException +import com.connectrpc.Headers +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.AnyMessage +import com.connectrpc.conformance.client.adapt.ClientCompatRequest +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.HttpVersion +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.TlsCreds +import com.connectrpc.conformance.client.adapt.ClientCompatResponse +import com.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.conformance.v1.ClientCompatRequest.Cancel.CancelTimingCase +import com.connectrpc.conformance.v1.ClientErrorResult +import com.connectrpc.conformance.v1.ClientResponseResult +import com.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.conformance.v1.Codec +import com.connectrpc.conformance.v1.Compression +import com.connectrpc.conformance.v1.ConformancePayload +import com.connectrpc.conformance.v1.Error +import com.connectrpc.conformance.v1.HTTPVersion +import com.connectrpc.conformance.v1.Header +import com.connectrpc.conformance.v1.IdempotentUnaryResponse +import com.connectrpc.conformance.v1.Protocol +import com.connectrpc.conformance.v1.ServerStreamResponse +import com.connectrpc.conformance.v1.StreamType +import com.connectrpc.conformance.v1.UnaryResponse +import com.connectrpc.conformance.v1.UnimplementedResponse +import com.connectrpc.extensions.GoogleJavaLiteProtobufStrategy +import com.connectrpc.protocols.NetworkProtocol +import com.google.protobuf.Any +import com.google.protobuf.ByteString +import com.google.protobuf.MessageLite + +class JavaLiteHelpers { + companion object { + private const val TYPE_URL_PREFIX = "type.googleapis.com/" + + fun serializationStrategy(codec: ClientCompatRequest.Codec): SerializationStrategy { + return when (codec) { + ClientCompatRequest.Codec.PROTO -> GoogleJavaLiteProtobufStrategy() + ClientCompatRequest.Codec.JSON -> throw RuntimeException("Java Lite does not support JSON") + } + } + + fun unmarshalRequest(bytes: ByteArray): ClientCompatRequest { + val msg = com.connectrpc.conformance.v1.ClientCompatRequest.parseFrom(bytes) + return ClientCompatRequestImpl(msg) + } + + fun marshalResponse(resp: ClientCompatResponse): ByteArray { + val builder = com.connectrpc.conformance.v1.ClientCompatResponse + .newBuilder() + .setTestName(resp.testName) + when (val result = resp.result) { + is ClientCompatResponse.Result.ResponseResult -> { + val respBuilder = ClientResponseResult.newBuilder() + .addAllResponseHeaders(toProtoHeaders(result.response.headers)) + .addAllPayloads(toProtoPayloads(result.response.payloads)) + .addAllResponseTrailers(toProtoHeaders(result.response.trailers)) + .setNumUnsentRequests(result.response.numUnsentRequests) + val err = result.response.error + if (err != null) { + respBuilder.setError(toProtoError(err)) + } + builder.setResponse(respBuilder) + } + is ClientCompatResponse.Result.ErrorResult -> { + builder.setError( + ClientErrorResult.newBuilder() + .setMessage(result.error), + ) + } + } + return builder.build().toByteArray() + } + + fun extractPayload(response: MessageLite): MessageLite { + return when (response) { + is UnaryResponse -> response.payload + is IdempotentUnaryResponse -> response.payload + is UnimplementedResponse -> ConformancePayload.getDefaultInstance() + is ClientStreamResponse -> response.payload + is ServerStreamResponse -> response.payload + is BidiStreamResponse -> response.payload + else -> throw RuntimeException("don't know how to extract payload from ${response::class.qualifiedName}") + } + } + + private fun fromProtoHeaders(headers: List
): Headers { + return headers.groupingBy(Header::getName).aggregate { _: String, accumulator: List?, element: Header, _: Boolean -> + accumulator?.plus(element.valueList) ?: element.valueList + } + } + + private fun toProtoHeaders(headers: Headers): List
{ + return headers.map { + Header.newBuilder() + .setName(it.key) + .addAllValue(it.value) + .build() + } + } + + private fun toProtoPayloads(payloads: List): List { + return payloads.map { + if (it is ConformancePayload) { + it + } else { + ConformancePayload.parseFrom(it.toByteArray()) + } + } + } + + private fun toProtoError(ex: ConnectException): Error { + return Error.newBuilder() + .setCode(ex.code.value) + .setMessage(ex.message ?: ex.code.codeName) + .addAllDetails( + ex.details.map { + Any.newBuilder() + .setTypeUrl(toTypeUrl(it.type)) + .setValue(ByteString.copyFrom(it.payload.toByteArray())) + .build() + }, + ) + .build() + } + + private fun toTypeUrl(typeName: String): String { + return if (typeName.contains('/')) typeName else TYPE_URL_PREFIX + typeName + } + } + + private class ClientCompatRequestImpl( + private val msg: com.connectrpc.conformance.v1.ClientCompatRequest, + ) : ClientCompatRequest { + override val testName: String + get() = msg.testName + override val service: String + get() = msg.service + override val method: String + get() = msg.method + override val host: String + get() = msg.host + override val port: Int + get() = msg.port + override val serverTlsCert: ByteString + get() = msg.serverTlsCert + override val clientTlsCreds: TlsCreds? + get() = if (msg.hasClientTlsCreds()) TlsCredsImpl(msg.clientTlsCreds) else null + override val timeoutMs: Int + get() = msg.timeoutMs + override val requestDelayMs: Int + get() = msg.requestDelayMs + override val useGetHttpMethod: Boolean + get() = msg.useGetHttpMethod + override val httpVersion: HttpVersion + get() = when (msg.httpVersion) { + HTTPVersion.HTTP_VERSION_1 -> HttpVersion.HTTP_1_1 + HTTPVersion.HTTP_VERSION_2 -> HttpVersion.HTTP_2 + else -> throw RuntimeException("unsupported HTTP version: ${msg.httpVersion}") + } + override val protocol: NetworkProtocol + get() = when (msg.protocol) { + Protocol.PROTOCOL_CONNECT -> NetworkProtocol.CONNECT + Protocol.PROTOCOL_GRPC -> NetworkProtocol.GRPC + Protocol.PROTOCOL_GRPC_WEB -> NetworkProtocol.GRPC_WEB + else -> throw RuntimeException("unsupported protocol: ${msg.protocol}") + } + override val codec: ClientCompatRequest.Codec + get() = when (msg.codec) { + Codec.CODEC_PROTO -> ClientCompatRequest.Codec.PROTO + Codec.CODEC_JSON -> ClientCompatRequest.Codec.JSON + else -> throw RuntimeException("unsupported codec: ${msg.codec}") + } + override val compression: ClientCompatRequest.Compression + get() = when (msg.compression) { + Compression.COMPRESSION_IDENTITY, Compression.COMPRESSION_UNSPECIFIED -> ClientCompatRequest.Compression.IDENTITY + Compression.COMPRESSION_GZIP -> ClientCompatRequest.Compression.GZIP + else -> throw RuntimeException("unsupported compression: ${msg.compression}") + } + override val streamType: ClientCompatRequest.StreamType + get() = when (msg.streamType) { + StreamType.STREAM_TYPE_UNARY -> ClientCompatRequest.StreamType.UNARY + StreamType.STREAM_TYPE_CLIENT_STREAM -> ClientCompatRequest.StreamType.CLIENT_STREAM + StreamType.STREAM_TYPE_SERVER_STREAM -> ClientCompatRequest.StreamType.SERVER_STREAM + StreamType.STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM -> ClientCompatRequest.StreamType.HALF_DUPLEX_BIDI_STREAM + StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM -> ClientCompatRequest.StreamType.FULL_DUPLEX_BIDI_STREAM + else -> throw RuntimeException("unsupported stream type: ${msg.streamType}") + } + override val requestHeaders: Headers + get() = fromProtoHeaders(msg.requestHeadersList) + override val requestMessages: List + get() = msg.requestMessagesList.map { + AnyMessage(it.typeUrl, it.value) + } + override val cancel: ClientCompatRequest.Cancel? + get() = when (msg.cancel.cancelTimingCase) { + CancelTimingCase.CANCELTIMING_NOT_SET, null -> + null + CancelTimingCase.BEFORE_CLOSE_SEND -> + ClientCompatRequest.Cancel.BeforeCloseSend() + CancelTimingCase.AFTER_CLOSE_SEND_MS -> + ClientCompatRequest.Cancel.AfterCloseSendMs(msg.cancel.afterCloseSendMs) + CancelTimingCase.AFTER_NUM_RESPONSES -> + ClientCompatRequest.Cancel.AfterNumResponses(msg.cancel.afterNumResponses) + } + } + + private class TlsCredsImpl( + private val msg: com.connectrpc.conformance.v1.ClientCompatRequest.TLSCreds, + ) : TlsCreds { + override val cert: ByteString + get() = msg.cert + override val key: ByteString + get() = msg.key + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteIdempotentUnaryClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteIdempotentUnaryClient.kt new file mode 100644 index 00000000..f0674afb --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteIdempotentUnaryClient.kt @@ -0,0 +1,50 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.IdempotentUnaryRequest +import com.connectrpc.conformance.v1.IdempotentUnaryResponse +import com.connectrpc.http.Cancelable + +class JavaLiteIdempotentUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + IdempotentUnaryRequest.getDefaultInstance(), + IdempotentUnaryResponse.getDefaultInstance(), +) { + override suspend fun execute( + req: IdempotentUnaryRequest, + headers: Headers, + ): ResponseMessage { + return client.idempotentUnary(req, headers) + } + + override fun execute( + req: IdempotentUnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.idempotentUnary(req, headers, onFinish) + } + + override fun blocking(req: IdempotentUnaryRequest, headers: Headers): UnaryBlockingCall { + return client.idempotentUnaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt index e5d758f2..2817acca 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt @@ -14,25 +14,27 @@ package com.connectrpc.conformance.client.javalite -import com.connectrpc.SerializationStrategy import com.connectrpc.conformance.client.adapt.BidiStreamClient import com.connectrpc.conformance.client.adapt.ClientStreamClient import com.connectrpc.conformance.client.adapt.Invoker import com.connectrpc.conformance.client.adapt.ServerStreamClient import com.connectrpc.conformance.client.adapt.UnaryClient -import com.connectrpc.extensions.GoogleJavaLiteProtobufStrategy +import com.connectrpc.conformance.v1.ConformanceServiceClient import com.connectrpc.impl.ProtocolClient -import com.connectrpc.lite.connectrpc.conformance.v1.Codec -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient class JavaLiteInvoker( protocolClient: ProtocolClient, ) : Invoker { private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { return JavaLiteUnaryClient(client) } + override fun idempotentUnaryClient(): UnaryClient<*, *> { + return JavaLiteIdempotentUnaryClient(client) + } + override fun unimplementedClient(): UnaryClient<*, *> { return JavaLiteUnimplementedClient(client) } @@ -48,14 +50,4 @@ class JavaLiteInvoker( override fun bidiStreamClient(): BidiStreamClient<*, *> { return JavaLiteBidiStreamClient(client) } - - companion object { - fun serializationStrategy(codec: Codec): SerializationStrategy { - return when (codec) { - Codec.CODEC_PROTO -> GoogleJavaLiteProtobufStrategy() - Codec.CODEC_JSON -> throw RuntimeException("Java Lite does not support JSON") - else -> throw RuntimeException("unsupported codec $codec") - } - } - } } diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt index a845dab2..4fe57e4b 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt @@ -17,9 +17,9 @@ package com.connectrpc.conformance.client.javalite import com.connectrpc.Headers import com.connectrpc.conformance.client.adapt.ResponseStream import com.connectrpc.conformance.client.adapt.ServerStreamClient -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient -import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamRequest -import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.ServerStreamRequest +import com.connectrpc.conformance.v1.ServerStreamResponse class JavaLiteServerStreamClient( private val client: ConformanceServiceClient, diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt index a919a446..01d86310 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt @@ -18,10 +18,10 @@ import com.connectrpc.Headers import com.connectrpc.ResponseMessage import com.connectrpc.UnaryBlockingCall import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnaryRequest +import com.connectrpc.conformance.v1.UnaryResponse import com.connectrpc.http.Cancelable -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient -import com.connectrpc.lite.connectrpc.conformance.v1.UnaryRequest -import com.connectrpc.lite.connectrpc.conformance.v1.UnaryResponse class JavaLiteUnaryClient( private val client: ConformanceServiceClient, diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt index a2ba8f11..e60e22d8 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt @@ -18,10 +18,10 @@ import com.connectrpc.Headers import com.connectrpc.ResponseMessage import com.connectrpc.UnaryBlockingCall import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnimplementedRequest +import com.connectrpc.conformance.v1.UnimplementedResponse import com.connectrpc.http.Cancelable -import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient -import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedRequest -import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedResponse class JavaLiteUnimplementedClient( private val client: ConformanceServiceClient, diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt index 4b369bb2..4a6b9386 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt @@ -19,11 +19,16 @@ import com.connectrpc.conformance.client.ConformanceClientLoop fun main(args: Array) { val invokeStyle = ConformanceClientLoop.parseArgs(args) + val loop = ConformanceClientLoop( + JavaLiteHelpers::unmarshalRequest, + JavaLiteHelpers::marshalResponse, + ) val client = Client( invokerFactory = ::JavaLiteInvoker, - serializationFactory = JavaLiteInvoker::serializationStrategy, + serializationFactory = JavaLiteHelpers::serializationStrategy, invokeStyle = invokeStyle, + payloadExtractor = JavaLiteHelpers::extractPayload, ) - ConformanceClientLoop.run(System.`in`, System.out, client) + loop.run(System.`in`, System.out, client) // TODO: catch any exception for better error output/logging? } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt index a9885447..631776ee 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -16,25 +16,23 @@ package com.connectrpc.conformance.client import com.connectrpc.ProtocolClientConfig import com.connectrpc.RequestCompression -import com.connectrpc.ResponseMessage import com.connectrpc.SerializationStrategy import com.connectrpc.compression.GzipCompressionPool +import com.connectrpc.conformance.client.adapt.AnyMessage import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientCompatRequest +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.Codec +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.Compression +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.HttpVersion +import com.connectrpc.conformance.client.adapt.ClientCompatRequest.StreamType +import com.connectrpc.conformance.client.adapt.ClientResponseResult import com.connectrpc.conformance.client.adapt.ClientStreamClient import com.connectrpc.conformance.client.adapt.Invoker import com.connectrpc.conformance.client.adapt.ServerStreamClient import com.connectrpc.conformance.client.adapt.UnaryClient import com.connectrpc.impl.ProtocolClient -import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest -import com.connectrpc.lite.connectrpc.conformance.v1.ClientResponseResult -import com.connectrpc.lite.connectrpc.conformance.v1.Codec -import com.connectrpc.lite.connectrpc.conformance.v1.Compression -import com.connectrpc.lite.connectrpc.conformance.v1.HTTPVersion -import com.connectrpc.lite.connectrpc.conformance.v1.Protocol -import com.connectrpc.lite.connectrpc.conformance.v1.StreamType import com.connectrpc.okhttp.ConnectOkHttpClient import com.connectrpc.protocols.GETConfiguration -import com.connectrpc.protocols.NetworkProtocol import com.google.protobuf.MessageLite import okhttp3.OkHttpClient import okhttp3.tls.HandshakeCertificates @@ -49,39 +47,67 @@ import java.security.spec.PKCS8EncodedKeySpec import java.time.Duration import java.util.Base64 import kotlin.reflect.cast -import kotlin.reflect.safeCast +/** + * The conformance client. This contains the logic for invoking an + * RPC and returning a representation of its result. + */ class Client( private val invokerFactory: (ProtocolClient) -> Invoker, private val serializationFactory: (Codec) -> SerializationStrategy, private val invokeStyle: UnaryClient.InvokeStyle, + private val payloadExtractor: (MessageLite) -> MessageLite, ) { + companion object { + private const val CONFORMANCE_SERVICE_NAME = "connectrpc.conformance.v1.ConformanceService" + private const val UNARY_METHOD_NAME = "Unary" + private const val IDEMPOTENT_UNARY_METHOD_NAME = "IdempotentUnary" + private const val UNIMPLEMENTED_METHOD_NAME = "Unimplemented" + private const val CLIENT_STREAM_METHOD_NAME = "ClientStream" + private const val SERVER_STREAM_METHOD_NAME = "ServerStream" + private const val BIDI_STREAM_METHOD_NAME = "BidiStream" + + private const val UNARY_REQUEST_NAME = "connectrpc.conformance.v1.UnaryRequest" + private const val IDEMPOTENT_UNARY_REQUEST_NAME = "connectrpc.conformance.v1.IdempotentUnaryRequest" + private const val UNIMPLEMENTED_REQUEST_NAME = "connectrpc.conformance.v1.UnimplementedRequest" + } + suspend fun handle(req: ClientCompatRequest): ClientResponseResult { - val invoker = invokerFactory(getProtocolClient(req)) - val service = req.service.orEmpty() - if (service != "connectrpc.conformance.v1.ConformanceService") { - throw RuntimeException("service $service is not known") - } + val (httpClient, protocolClient) = getClient(req) + try { + val invoker = invokerFactory(protocolClient) + val service = req.service + if (service != CONFORMANCE_SERVICE_NAME) { + throw RuntimeException("service $service is not known") + } - return when (val method = req.method.orEmpty()) { - "Unary" -> handleUnary(invoker.unaryClient(), req) - // TODO: IdempotentUnary - "Unimplemented" -> handleUnary(invoker.unimplementedClient(), req) - "ClientStream" -> handleClient(invoker.clientStreamClient(), req) - "ServerStream" -> handleServer(invoker.serverStreamClient(), req) - "BidiStream" -> if (req.streamType == StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM) { - handleFullDuplexBidi(invoker.bidiStreamClient(), req) - } else { - handleHalfDuplexBidi(invoker.bidiStreamClient(), req) + return when (req.method) { + UNARY_METHOD_NAME -> handleUnary(invoker.unaryClient(), UNARY_REQUEST_NAME, req) + IDEMPOTENT_UNARY_METHOD_NAME -> handleUnary(invoker.idempotentUnaryClient(), IDEMPOTENT_UNARY_REQUEST_NAME, req) + UNIMPLEMENTED_METHOD_NAME -> handleUnary(invoker.unimplementedClient(), UNIMPLEMENTED_REQUEST_NAME, req) + CLIENT_STREAM_METHOD_NAME -> handleClient(invoker.clientStreamClient(), req) + SERVER_STREAM_METHOD_NAME -> handleServer(invoker.serverStreamClient(), req) + BIDI_STREAM_METHOD_NAME -> handleBidi(invoker.bidiStreamClient(), req) + else -> throw RuntimeException("method ${req.method} is not known") } - else -> throw RuntimeException("method $method is not known") + } finally { + // Clean-up HTTP client. + httpClient.connectionPool.evictAll() + httpClient.dispatcher.executorService.shutdown() } } - private suspend fun handleUnary( + private suspend fun < + Req : MessageLite, + Resp : MessageLite, + > handleUnary( client: UnaryClient, + requestType: String, req: ClientCompatRequest, ): ClientResponseResult { + if (req.streamType != StreamType.UNARY) { + throw RuntimeException("specified method ${req.method} is unary but stream type indicates ${req.streamType}") + } TODO("implement me") } @@ -89,6 +115,9 @@ class Client( client: ClientStreamClient, req: ClientCompatRequest, ): ClientResponseResult { + if (req.streamType != StreamType.CLIENT_STREAM) { + throw RuntimeException("specified method ${req.method} is client-stream but stream type indicates ${req.streamType}") + } TODO("implement me") } @@ -96,9 +125,26 @@ class Client( client: ServerStreamClient, req: ClientCompatRequest, ): ClientResponseResult { + if (req.streamType != StreamType.SERVER_STREAM) { + throw RuntimeException("specified method ${req.method} is server-stream but stream type indicates ${req.streamType}") + } TODO("implement me") } + private suspend fun handleBidi( + client: BidiStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + return when (req.streamType) { + StreamType.HALF_DUPLEX_BIDI_STREAM -> + handleHalfDuplexBidi(client, req) + StreamType.FULL_DUPLEX_BIDI_STREAM -> + handleFullDuplexBidi(client, req) + else -> + throw RuntimeException("specified method ${req.method} is bidi-stream but stream type indicates ${req.streamType}") + } + } + private suspend fun handleHalfDuplexBidi( client: BidiStreamClient, req: ClientCompatRequest, @@ -113,63 +159,66 @@ class Client( TODO("implement me") } - private fun getProtocolClient(req: ClientCompatRequest): ProtocolClient { + private fun getClient(req: ClientCompatRequest): Pair { // TODO: cache/re-use clients instead of creating a new one for every request val serializationStrategy = serializationFactory(req.codec) val useTls = !req.serverTlsCert.isEmpty val scheme = if (useTls) "https" else "http" val host = "$scheme://${req.host}:${req.port}" var clientBuilder = OkHttpClient.Builder() - .protocols(listOf(asOkHttpProtocol(req.httpVersion, useTls))) + .protocols(asOkHttpProtocols(req.httpVersion, useTls)) .connectTimeout(Duration.ofMinutes(1)) if (useTls) { val certs = certs(req) clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager) } - if (req.hasTimeoutMs()) { + if (req.timeoutMs != 0) { clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong())) } val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled val requestCompression = - if (req.compression == Compression.COMPRESSION_GZIP) { - RequestCompression(10, GzipCompressionPool) + if (req.compression == Compression.GZIP) { + RequestCompression(0, GzipCompressionPool) } else { null } val compressionPools = - if (req.compression == Compression.COMPRESSION_GZIP) { + if (req.compression == Compression.GZIP) { listOf(GzipCompressionPool) } else { emptyList() } - return ProtocolClient( - httpClient = ConnectOkHttpClient(clientBuilder.build()), - ProtocolClientConfig( - host = host, - serializationStrategy = serializationStrategy, - networkProtocol = asNetworkProtocol(req.protocol), - getConfiguration = getConfig, - requestCompression = requestCompression, - compressionPools = compressionPools, + val httpClient = clientBuilder.build() + return Pair( + httpClient, + ProtocolClient( + httpClient = ConnectOkHttpClient(httpClient), + ProtocolClientConfig( + host = host, + serializationStrategy = serializationStrategy, + networkProtocol = req.protocol, + getConfiguration = getConfig, + requestCompression = requestCompression, + compressionPools = compressionPools, + ), ), ) } - private fun asNetworkProtocol(protocol: Protocol): NetworkProtocol { - return when (protocol) { - Protocol.PROTOCOL_CONNECT -> NetworkProtocol.CONNECT - Protocol.PROTOCOL_GRPC -> NetworkProtocol.GRPC - Protocol.PROTOCOL_GRPC_WEB -> NetworkProtocol.GRPC_WEB - else -> throw RuntimeException("unsupported protocol: $protocol") - } - } - - private fun asOkHttpProtocol(httpVersion: HTTPVersion, useTls: Boolean): okhttp3.Protocol { + private fun asOkHttpProtocols(httpVersion: HttpVersion, useTls: Boolean): List { return when (httpVersion) { - HTTPVersion.HTTP_VERSION_1 -> okhttp3.Protocol.HTTP_1_1 - HTTPVersion.HTTP_VERSION_2 -> if (useTls) okhttp3.Protocol.HTTP_2 else okhttp3.Protocol.H2_PRIOR_KNOWLEDGE - else -> throw RuntimeException("unsupported HTTP version: $httpVersion") + HttpVersion.HTTP_1_1 -> listOf(okhttp3.Protocol.HTTP_1_1) + HttpVersion.HTTP_2 -> + if (useTls) { + // okhttp *requires* that protocols contains HTTP_1_1 + // or H2_PRIOR_KNOWLEDGE. So we leave 1.1 in here, but + // expect HTTP/2 to always be used in practice since it + // should be negotiated during TLS handshake, + listOf(okhttp3.Protocol.HTTP_2, okhttp3.Protocol.HTTP_1_1) + } else { + listOf(okhttp3.Protocol.H2_PRIOR_KNOWLEDGE) + } } } @@ -180,15 +229,13 @@ class Client( val result = HandshakeCertificates.Builder() .addTrustedCertificate(certificateAuthority) - if (!req.hasClientTlsCreds()) { - return result.build() - } + val creds = req.clientTlsCreds ?: return result.build() - val certificate = req.clientTlsCreds.cert.newInput().use { stream -> + val certificate = creds.cert.newInput().use { stream -> CertificateFactory.getInstance("X.509").generateCertificate(stream) as X509Certificate } val publicKey = certificate.publicKey as RSAPublicKey - val privateKeyBytes = req.clientTlsCreds.key.newInput().bufferedReader().use { stream -> + val privateKeyBytes = creds.key.newInput().bufferedReader().use { stream -> val lines = stream.readLines().toMutableList() // Remove BEGIN RSA PRIVATE KEY / END RSA PRIVATE KEY lines lines.removeFirst() @@ -205,39 +252,19 @@ class Client( .build() } - private fun convert( - from: From, - template: To, - ): To { - val clazz = template::class - return clazz.safeCast(from) - ?: clazz.cast( - template - .newBuilderForType() - .mergeFrom(from.toByteString()) - .build(), - ) - } - - private fun convert( - from: ResponseMessage, - template: To, - ): ResponseMessage { - return when (from) { - is ResponseMessage.Success -> { - ResponseMessage.Success( - message = convert(from.message, template), - code = from.code, - headers = from.headers, - trailers = from.trailers, - ) - } - is ResponseMessage.Failure -> { - // Value does not actually contain a reference - // to response type, so we can just cast it. - @Suppress("UNCHECKED_CAST") - from as ResponseMessage - } + private fun fromAny( + any: AnyMessage, + template: M, + typeName: String, + ): M { + val pos = any.typeUrl.lastIndexOf('/') + val actualTypeName = any.typeUrl.substring(pos + 1) + if (actualTypeName != typeName) { + throw RuntimeException("expecting request message to be $typeName, instead got $actualTypeName") } + val msgClass = template::class + return msgClass.cast( + template.newBuilderForType().mergeFrom(any.value).build(), + ) } } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt index f2bf019d..652cc45e 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt @@ -14,84 +14,100 @@ package com.connectrpc.conformance.client +import com.connectrpc.conformance.client.adapt.ClientCompatRequest +import com.connectrpc.conformance.client.adapt.ClientCompatResponse import com.connectrpc.conformance.client.adapt.UnaryClient.InvokeStyle -import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest -import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatResponse -import com.connectrpc.lite.connectrpc.conformance.v1.ClientErrorResult -import com.google.protobuf.ByteString import kotlinx.coroutines.runBlocking import java.io.EOFException import java.io.InputStream import java.io.OutputStream -class ConformanceClientLoop { - companion object { - fun run(input: InputStream, output: OutputStream, client: Client) = runBlocking { - // TODO: issue RPCs in parallel - while (true) { - var result: ClientCompatResponse - val req = readRequest(input) ?: return@runBlocking // end of stream - try { - val resp = client.handle(req) - result = ClientCompatResponse.newBuilder().setResponse(resp).build() - } catch (e: Exception) { - val msg = if (e.message.orEmpty() == "") { - e::class.qualifiedName - } else { - "${e::class.qualifiedName}: ${e.message}" - } - result = ClientCompatResponse.newBuilder() - .setError(ClientErrorResult.newBuilder().setMessage(msg)) - .build() +/** + * The main loop that a conformance client program executes. This + * loop reads requests from stdin, uses a Client to issue the RPC + * described by the request, and writes the results of the RPC to + * stdout. + */ +class ConformanceClientLoop( + private val requestUnmarshaller: (ByteArray) -> ClientCompatRequest, + private val responseMarshaller: (ClientCompatResponse) -> ByteArray, +) { + fun run(input: InputStream, output: OutputStream, client: Client) = runBlocking { + // TODO: issue RPCs in parallel + while (true) { + var result: ClientCompatResponse.Result + val req = readRequest(input) ?: return@runBlocking // end of stream + try { + val resp = client.handle(req) + result = ClientCompatResponse.Result.ResponseResult(resp) + } catch (e: Exception) { + val msg = if (e.message.orEmpty() == "") { + e::class.qualifiedName.orEmpty() + } else { + "${e::class.qualifiedName}: ${e.message}" } - writeResponse(output, result) + result = ClientCompatResponse.Result.ErrorResult(msg) } + writeResponse( + output, + ClientCompatResponse( + testName = req.testName, + result = result, + ), + ) } + } - private fun readRequest(input: InputStream): ClientCompatRequest? { - val len = input.readInt() ?: return null - val data = input.readN(len) - ?: throw EOFException("unexpected EOF: read 0 of $len expected message bytes") - return ClientCompatRequest.parseFrom(ByteString.copyFrom(data)) - } + private fun readRequest(input: InputStream): ClientCompatRequest? { + val len = input.readInt() ?: return null + val data = input.readN(len) + ?: throw EOFException("unexpected EOF: read 0 of $len expected message bytes") + return requestUnmarshaller(data) + } - private fun writeResponse(output: OutputStream, resp: ClientCompatResponse) { - val len = resp.serializedSize - val prefix = ByteArray(4) - prefix[0] = len.ushr(24).toByte() - prefix[1] = len.ushr(16).toByte() - prefix[2] = len.ushr(8).toByte() - prefix[3] = len.toByte() - output.write(prefix) - resp.writeTo(output) - } + private fun writeResponse(output: OutputStream, resp: ClientCompatResponse) { + val respBytes = responseMarshaller(resp) + val prefix = ByteArray(4) + val len = respBytes.size + prefix[0] = len.ushr(24).toByte() + prefix[1] = len.ushr(16).toByte() + prefix[2] = len.ushr(8).toByte() + prefix[3] = len.toByte() + output.write(prefix) + output.write(respBytes) + } - private fun InputStream.readN(len: Int): ByteArray? { - val bytes = ByteArray(len) - var offs = 0 - var remain = len - while (remain > 0) { - val n = this.read(bytes, offs, remain) - if (n == 0) { + private fun InputStream.readN(len: Int): ByteArray? { + val bytes = ByteArray(len) + var offs = 0 + var remain = len + while (remain > 0) { + val n = this.read(bytes, offs, remain) + when (n) { + -1, 0 -> { if (offs == 0) { return null } throw EOFException("unexpected EOF: read $offs of $len expected bytes") } - offs += n - remain -= n + else -> { + offs += n + remain -= n + } } - return bytes } + return bytes + } - private fun InputStream.readInt(): Int? { - val bytes = this.readN(4) ?: return null - return bytes[0].toInt().and(0xff).shl(24) or - bytes[1].toInt().and(0xff).shl(16) or - bytes[2].toInt().and(0xff).shl(8) or - bytes[3].toInt().and(0xff) - } + private fun InputStream.readInt(): Int? { + val bytes = this.readN(4) ?: return null + return bytes[0].toInt().and(0xff).shl(24) or + bytes[1].toInt().and(0xff).shl(16) or + bytes[2].toInt().and(0xff).shl(8) or + bytes[3].toInt().and(0xff) + } + companion object { fun parseArgs(args: Array): InvokeStyle { if (args.isEmpty()) { return InvokeStyle.SUSPEND diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/AnyMessage.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/AnyMessage.kt new file mode 100644 index 00000000..93cc1956 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/AnyMessage.kt @@ -0,0 +1,28 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.google.protobuf.ByteString + +/** + * Corresponds to a google.protobuf.Any message. This is distinct + * from the com.google.protobuf.Any Java class so that it can be + * used without relying on a particular runtime (e.g. the lite vs. + * standard runtimes). + */ +class AnyMessage( + val typeUrl: String, + val value: ByteString, +) diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatRequest.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatRequest.kt new file mode 100644 index 00000000..36422e5e --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatRequest.kt @@ -0,0 +1,91 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.Headers +import com.connectrpc.protocols.NetworkProtocol +import com.google.protobuf.ByteString + +/** + * ClientCompatRequest represents a request to the conformance client. + * It describes the properties of an RPC that the client should issue. + * This corresponds to the connectrpc.conformance.v1.ClientCompatRequest + * proto message. + * + * We manually define this interface and then have implementations that + * adapt generated code to this interface. This allows us to have a + * single client implementation, using a single representation of the + * request, which could be backed by a generated message that uses either + * the standard or the lite runtime. + * + * Unfortunately, the standard and lite runtimes are incompatible so we + * can't directly use either of them as the singular representation that + * the client implementation uses. So this abstraction is needed so the + * same client code can be used for both runtimes. + */ +interface ClientCompatRequest { + val testName: String + val service: String + val method: String + val host: String + val port: Int + val serverTlsCert: ByteString + val clientTlsCreds: TlsCreds? + val timeoutMs: Int + val requestDelayMs: Int + val useGetHttpMethod: Boolean + val httpVersion: HttpVersion + val protocol: NetworkProtocol + val codec: Codec + val compression: Compression + val streamType: StreamType + val requestHeaders: Headers + val requestMessages: List + val cancel: Cancel? + + interface TlsCreds { + val cert: ByteString + val key: ByteString + } + + sealed class Cancel { + class BeforeCloseSend : Cancel() + class AfterCloseSendMs(val millis: Int) : Cancel() + class AfterNumResponses(val num: Int) : Cancel() + } + + enum class HttpVersion { + HTTP_1_1, + HTTP_2, + } + + enum class Codec { + PROTO, + JSON, + } + + enum class Compression { + IDENTITY, + GZIP, + } + + enum class StreamType { + UNARY, + CLIENT_STREAM, + SERVER_STREAM, + HALF_DUPLEX_BIDI_STREAM, + FULL_DUPLEX_BIDI_STREAM, + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatResponse.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatResponse.kt new file mode 100644 index 00000000..876a7e1e --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientCompatResponse.kt @@ -0,0 +1,38 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +/** + * Represents the response of a conformance request. This + * describes the RPC result of invoking an RPC for a particular + * conformance test case. + * + * This corresponds to the connectrpc.conformance.v1.ClientCompatResponse + * proto message. Its presence is to provide a representation that + * doesn't rely on either the standard or lite Protobuf runtime. + * + * This can represent a result received from an RPC server or an + * error that prevented the RPC from being invoked. + */ +data class ClientCompatResponse( + val testName: String, + val result: Result, +) { + + sealed class Result { + class ResponseResult(val response: ClientResponseResult) : Result() + class ErrorResult(val error: String) : Result() + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientResponseResult.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientResponseResult.kt new file mode 100644 index 00000000..1d8c5724 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientResponseResult.kt @@ -0,0 +1,34 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.ConnectException +import com.connectrpc.Headers +import com.google.protobuf.MessageLite + +/** + * Represents the result of issuing an RPC. + * + * This corresponds to the connectrpc.conformance.v1.ClientResponseResult + * proto message. Its presence is to provide a representation that + * doesn't rely on either the standard or lite Protobuf runtime. + */ +class ClientResponseResult( + val headers: Headers = emptyMap(), + val payloads: List = emptyList(), + val trailers: Headers = emptyMap(), + val error: ConnectException? = null, + val numUnsentRequests: Int = 0, +) diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt index 6941c1d7..01691ec4 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -30,7 +30,6 @@ import com.google.protobuf.MessageLite * @param Req The request message type * @param Resp The response message type */ - abstract class ClientStreamClient( val reqTemplate: Req, val respTemplate: Resp, diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt index 08ab3419..30986914 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt @@ -14,8 +14,15 @@ package com.connectrpc.conformance.client.adapt +/** + * An RPC stub that allows for invoking RPC methods. + * Each method of Invoker corresponds to an RPC method + * and returns a client stub that can be used to actually + * invoke that RPC. + */ interface Invoker { fun unaryClient(): UnaryClient<*, *> + fun idempotentUnaryClient(): UnaryClient<*, *> fun unimplementedClient(): UnaryClient<*, *> fun clientStreamClient(): ClientStreamClient<*, *> fun serverStreamClient(): ServerStreamClient<*, *> diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt index d7ca2463..c331e202 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt @@ -41,6 +41,17 @@ abstract class UnaryClient( abstract fun blocking(req: Req, headers: Headers): UnaryBlockingCall + /** + * Executes the unary RPC using the given invocation style, request + * message, and request headers. The given callback is invoked when + * the operation completes. + * + * This signature resembles the one above that takes a callback, but + * will adapt the call to the suspend or blocking signatures if so + * directed by the given InvokeStyle. This allows a caller to use a + * single shape to invoke the RPC, but actually exercise any/all of + * the above three signatures. + */ suspend fun execute( style: InvokeStyle, req: Req, @@ -75,9 +86,36 @@ abstract class UnaryClient( } } + /** + * The style of invocation, one each for the three different + * ways to invoke a unary RPC. + */ enum class InvokeStyle { + /** + * Indicates the callback-based async signature, which + * invokes the method with the following signature: + * ``` + * fun execute(Req, Headers, (ResponseMessage)->Unit): Cancelable + * ``` + */ CALLBACK, + + /** + * Indicates the suspend-based async signature, which + * invokes the method with the following signature: + * ``` + * suspend fun execute(Req, Headers): ResponseMessage + * ``` + */ SUSPEND, + + /** + * Indicates the blocking signature, which invokes the + * method with the following signature: + * ``` + * fun blocking(Req, Headers): UnaryBlockingCall + * ``` + */ BLOCKING, } }