From 9bd518f16c236ff8f9828f0d43f4e7a73c227b18 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Mon, 18 Dec 2023 10:52:51 -0500 Subject: [PATCH] Adds new conformance/client module - This contains the new conformance client, for the new conformance tests - The base module has all of the logic and uses the Java Lite runtime (since it is lowest common denominator) - There are google-java and google-javalite modules which have the main functions for running the clients. The google-java module swaps out the javalite runtime for the java runtime. The client module (with shared logic) can convert from javalite generated messages to java generated messages by serializing and deserializing - The adapt package in the client module provides a glimpse of what I think better stream APIs would look like. I'd like to update the actual stream interfaces to more closely match this in the future. For now, the code just adapts exising interfaces to these. - To test the different unary invocation styles, there is an enum for each style and a method in UnaryClient that is an adapter: a single interface for the actual conformance client to use (that looks like the Callback style), but under the hood it will call the suspend, callback, or blocking methods based on the desired invocation style. - The actual client logic is TODO, in various handle* methods of the Client class. This felt like a big enough chunk already, so I figured it would be good to get this reviewed before plowing through the rest. --- Makefile | 2 + conformance/client/buf.gen.lite.yaml | 20 ++ conformance/client/buf.gen.yaml | 17 ++ conformance/client/build.gradle.kts | 30 +++ .../client/google-java/build.gradle.kts | 35 +++ .../client/java/JavaBidiStreamClient.kt | 32 +++ .../client/java/JavaClientStreamClient.kt | 32 +++ .../conformance/client/java/JavaInvoker.kt | 62 +++++ .../client/java/JavaServerStreamClient.kt | 35 +++ .../client/java/JavaUnaryClient.kt | 47 ++++ .../client/java/JavaUnimplementedClient.kt | 47 ++++ .../conformance/client/java/Main.kt | 19 ++ .../client/google-javalite/build.gradle.kts | 13 + .../javalite/JavaLiteBidiStreamClient.kt | 32 +++ .../javalite/JavaLiteClientStreamClient.kt | 32 +++ .../client/javalite/JavaLiteInvoker.kt | 61 +++++ .../javalite/JavaLiteServerStreamClient.kt | 35 +++ .../client/javalite/JavaLiteUnaryClient.kt | 50 ++++ .../javalite/JavaLiteUnimplementedClient.kt | 47 ++++ .../conformance/client/javalite/Main.kt | 19 ++ .../connectrpc/conformance/client/Client.kt | 228 ++++++++++++++++++ .../client/ConformanceClientLoop.kt | 96 ++++++++ .../client/adapt/BidiStreamClient.kt | 65 +++++ .../client/adapt/ClientStreamClient.kt | 86 +++++++ .../conformance/client/adapt/Invoker.kt | 23 ++ .../conformance/client/adapt/RequestStream.kt | 42 ++++ .../client/adapt/ResponseStream.kt | 82 +++++++ .../client/adapt/ServerStreamClient.kt | 33 +++ .../conformance/client/adapt/UnaryClient.kt | 83 +++++++ settings.gradle.kts | 3 + 30 files changed, 1408 insertions(+) create mode 100644 conformance/client/buf.gen.lite.yaml create mode 100644 conformance/client/buf.gen.yaml create mode 100644 conformance/client/build.gradle.kts create mode 100644 conformance/client/google-java/build.gradle.kts create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt create mode 100644 conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt create mode 100644 conformance/client/google-javalite/build.gradle.kts create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt create mode 100644 conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt create mode 100644 conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt diff --git a/Makefile b/Makefile index 6e546674..270282c8 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,8 @@ generate: $(PROTOC) buildplugin generateconformance generateexamples ## Generate .PHONY: generateconformance generateconformance: $(PROTOC) buildplugin ## Generate protofiles for conformance tests. buf generate --template conformance/buf.gen.yaml -o conformance conformance/proto + buf generate --template conformance/client/buf.gen.yaml -o conformance/client buf.build/connectrpc/conformance:v1.0.0-rc1 + buf generate --template conformance/client/buf.gen.lite.yaml -o conformance/client buf.build/connectrpc/conformance:v1.0.0-rc1 .PHONY: generateexamples generateexamples: $(PROTOC) buildplugin ## Generate proto files for example apps. diff --git a/conformance/client/buf.gen.lite.yaml b/conformance/client/buf.gen.lite.yaml new file mode 100644 index 00000000..21df319f --- /dev/null +++ b/conformance/client/buf.gen.lite.yaml @@ -0,0 +1,20 @@ +version: v1 +managed: + enabled: true + java_package_prefix: com.connectrpc.lite +plugins: + - plugin: connect-kotlin + out: build/generated/sources/bufgen + path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin + opt: + - generateCallbackMethods=true + - generateCoroutineMethods=true + - generateBlockingUnaryMethods=true + - plugin: java + out: build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + opt: lite + - plugin: kotlin + out: build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + opt: lite diff --git a/conformance/client/buf.gen.yaml b/conformance/client/buf.gen.yaml new file mode 100644 index 00000000..51f0f04e --- /dev/null +++ b/conformance/client/buf.gen.yaml @@ -0,0 +1,17 @@ +version: v1 +managed: + enabled: true +plugins: + - plugin: connect-kotlin + out: google-java/build/generated/sources/bufgen + path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin + opt: + - generateCallbackMethods=true + - generateCoroutineMethods=true + - generateBlockingUnaryMethods=true + - plugin: java + out: google-java/build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + - plugin: kotlin + out: google-java/build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc diff --git a/conformance/client/build.gradle.kts b/conformance/client/build.gradle.kts new file mode 100644 index 00000000..21cabbc9 --- /dev/null +++ b/conformance/client/build.gradle.kts @@ -0,0 +1,30 @@ +plugins { + kotlin("jvm") +} + +// This base project contains generated code for the lite runtime +// and depends on the Google Protobuf Java Lite runtime. +// The main client logic is implemented in terms of generated +// code for that lite runtime. +// +// The non-lite runtime excludes the Google Protobuf Java Lite +// runtime and instead uses the full Java runtime. It then can +// adapt from the lite-runtime-generated code by serializing to +// bytes and then de-serializing into non-lite-generated types. + +sourceSets { + main { + java { + srcDir("build/generated/sources/bufgen") + } + } +} + +dependencies { + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.protobuf.javalite) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-java/build.gradle.kts b/conformance/client/google-java/build.gradle.kts new file mode 100644 index 00000000..844bae92 --- /dev/null +++ b/conformance/client/google-java/build.gradle.kts @@ -0,0 +1,35 @@ +plugins { + kotlin("jvm") +} + +tasks { + compileKotlin { + kotlinOptions { + // Generated Kotlin code for protobufs uses OptIn annotation + freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn" + } + } +} + +// This project contains an alternate copy of the generated +// types, generated for the non-lite runtime. +sourceSets { + main { + java { + srcDir("build/generated/sources/bufgen") + } + } +} + +dependencies { + implementation(project(":conformance:client")) { + exclude(group = "com.google.protobuf", module = "protobuf-javalite") + } + implementation(project(":extensions:google-java")) + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.protobuf.java) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt new file mode 100644 index 00000000..70512fcf --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.v1.BidiStreamRequest +import com.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaBidiStreamClient( + private val client: ConformanceServiceClient, +) : BidiStreamClient( + BidiStreamRequest.getDefaultInstance(), + BidiStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): BidiStream { + return BidiStream.new(client.bidiStream(headers)) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt new file mode 100644 index 00000000..b178947b --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.v1.ClientStreamRequest +import com.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaClientStreamClient( + private val client: ConformanceServiceClient, +) : ClientStreamClient( + ClientStreamRequest.getDefaultInstance(), + ClientStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): ClientStream { + return ClientStream.new(client.clientStream(headers)) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt new file mode 100644 index 00000000..297b4c31 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt @@ -0,0 +1,62 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.extensions.GoogleJavaJSONStrategy +import com.connectrpc.extensions.GoogleJavaProtobufStrategy +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.Codec + +class JavaInvoker( + protocolClient: ProtocolClient, +) : Invoker { + private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { + return JavaUnaryClient(client) + } + + override fun unimplementedClient(): UnaryClient<*, *> { + return JavaUnimplementedClient(client) + } + + override fun clientStreamClient(): ClientStreamClient<*, *> { + return JavaClientStreamClient(client) + } + + override fun serverStreamClient(): ServerStreamClient<*, *> { + return JavaServerStreamClient(client) + } + + override fun bidiStreamClient(): BidiStreamClient<*, *> { + return JavaBidiStreamClient(client) + } + + companion object { + fun serializationStrategy(codec: Codec): SerializationStrategy { + return when (codec) { + Codec.CODEC_PROTO -> GoogleJavaProtobufStrategy() + Codec.CODEC_JSON -> GoogleJavaJSONStrategy() + else -> throw RuntimeException("unsupported codec $codec") + } + } + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt new file mode 100644 index 00000000..e882a4ff --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt @@ -0,0 +1,35 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ResponseStream +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.ServerStreamRequest +import com.connectrpc.conformance.v1.ServerStreamResponse + +class JavaServerStreamClient( + private val client: ConformanceServiceClient, +) : ServerStreamClient( + ServerStreamRequest.getDefaultInstance(), + ServerStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream { + val stream = client.serverStream(headers) + stream.sendAndClose(req) + return ResponseStream.new(stream) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt new file mode 100644 index 00000000..f41a95a8 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnaryRequest +import com.connectrpc.conformance.v1.UnaryResponse +import com.connectrpc.http.Cancelable + +class JavaUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnaryRequest.getDefaultInstance(), + UnaryResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnaryRequest, headers: Headers): ResponseMessage { + return client.unary(req, headers) + } + + override fun execute( + req: UnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unary(req, headers, onFinish) + } + + override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall { + return client.unaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt new file mode 100644 index 00000000..34a3981a --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnimplementedRequest +import com.connectrpc.conformance.v1.UnimplementedResponse +import com.connectrpc.http.Cancelable + +class JavaUnimplementedClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnimplementedRequest.getDefaultInstance(), + UnimplementedResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage { + return client.unimplemented(req, headers) + } + + override fun execute( + req: UnimplementedRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unimplemented(req, headers, onFinish) + } + + override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall { + return client.unimplementedBlocking(req, headers) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt new file mode 100644 index 00000000..afd07c80 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt @@ -0,0 +1,19 @@ +package com.connectrpc.conformance.client.java + +import com.connectrpc.conformance.client.Client +import com.connectrpc.conformance.client.ConformanceClientLoop + +fun main(args: Array) { + try { + val invokeStyle = ConformanceClientLoop.parseArgs(args) + val client = Client( + invokerFactory = ::JavaInvoker, + serializationFactory = JavaInvoker::serializationStrategy, + invokeStyle = invokeStyle, + ) + ConformanceClientLoop.run(System.`in`, System.out, client) + } catch (e: Exception) { + e.printStackTrace(System.err) + Runtime.getRuntime().exit(1) + } +} diff --git a/conformance/client/google-javalite/build.gradle.kts b/conformance/client/google-javalite/build.gradle.kts new file mode 100644 index 00000000..aa6cf7b9 --- /dev/null +++ b/conformance/client/google-javalite/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + kotlin("jvm") +} + +dependencies { + implementation(project(":conformance:client")) + implementation(project(":extensions:google-javalite")) + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt new file mode 100644 index 00000000..9cf9a6bf --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteBidiStreamClient( + private val client: ConformanceServiceClient, +) : BidiStreamClient( + BidiStreamRequest.getDefaultInstance(), + BidiStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): BidiStream { + return BidiStream.new(client.bidiStream(headers)) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt new file mode 100644 index 00000000..61c8d048 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteClientStreamClient( + private val client: ConformanceServiceClient, +) : ClientStreamClient( + ClientStreamRequest.getDefaultInstance(), + ClientStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): ClientStream { + return ClientStream.new(client.clientStream(headers)) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt new file mode 100644 index 00000000..e5d758f2 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt @@ -0,0 +1,61 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.extensions.GoogleJavaLiteProtobufStrategy +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.Codec +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteInvoker( + protocolClient: ProtocolClient, +) : Invoker { + private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { + return JavaLiteUnaryClient(client) + } + + override fun unimplementedClient(): UnaryClient<*, *> { + return JavaLiteUnimplementedClient(client) + } + + override fun clientStreamClient(): ClientStreamClient<*, *> { + return JavaLiteClientStreamClient(client) + } + + override fun serverStreamClient(): ServerStreamClient<*, *> { + return JavaLiteServerStreamClient(client) + } + + override fun bidiStreamClient(): BidiStreamClient<*, *> { + return JavaLiteBidiStreamClient(client) + } + + companion object { + fun serializationStrategy(codec: Codec): SerializationStrategy { + return when (codec) { + Codec.CODEC_PROTO -> GoogleJavaLiteProtobufStrategy() + Codec.CODEC_JSON -> throw RuntimeException("Java Lite does not support JSON") + else -> throw RuntimeException("unsupported codec $codec") + } + } + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt new file mode 100644 index 00000000..a845dab2 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt @@ -0,0 +1,35 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ResponseStream +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamResponse + +class JavaLiteServerStreamClient( + private val client: ConformanceServiceClient, +) : ServerStreamClient( + ServerStreamRequest.getDefaultInstance(), + ServerStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream { + val stream = client.serverStream(headers) + stream.sendAndClose(req) + return ResponseStream.new(stream) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt new file mode 100644 index 00000000..a919a446 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt @@ -0,0 +1,50 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.http.Cancelable +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.UnaryRequest +import com.connectrpc.lite.connectrpc.conformance.v1.UnaryResponse + +class JavaLiteUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnaryRequest.getDefaultInstance(), + UnaryResponse.getDefaultInstance(), +) { + override suspend fun execute( + req: UnaryRequest, + headers: Headers, + ): ResponseMessage { + return client.unary(req, headers) + } + + override fun execute( + req: UnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unary(req, headers, onFinish) + } + + override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall { + return client.unaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt new file mode 100644 index 00000000..a2ba8f11 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.http.Cancelable +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedRequest +import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedResponse + +class JavaLiteUnimplementedClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnimplementedRequest.getDefaultInstance(), + UnimplementedResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage { + return client.unimplemented(req, headers) + } + + override fun execute( + req: UnimplementedRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unimplemented(req, headers, onFinish) + } + + override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall { + return client.unimplementedBlocking(req, headers) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt new file mode 100644 index 00000000..16cce508 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt @@ -0,0 +1,19 @@ +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.conformance.client.Client +import com.connectrpc.conformance.client.ConformanceClientLoop + +fun main(args: Array) { + try { + val invokeStyle = ConformanceClientLoop.parseArgs(args) + val client = Client( + invokerFactory = ::JavaLiteInvoker, + serializationFactory = JavaLiteInvoker::serializationStrategy, + invokeStyle = invokeStyle, + ) + ConformanceClientLoop.run(System.`in`, System.out, client) + } catch (e: Exception) { + e.printStackTrace(System.err) + Runtime.getRuntime().exit(1) + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt new file mode 100644 index 00000000..ccfd8214 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -0,0 +1,228 @@ +package com.connectrpc.conformance.client + +import com.connectrpc.ProtocolClientConfig +import com.connectrpc.RequestCompression +import com.connectrpc.ResponseMessage +import com.connectrpc.SerializationStrategy +import com.connectrpc.compression.GzipCompressionPool +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientResponseResult +import com.connectrpc.lite.connectrpc.conformance.v1.Codec +import com.connectrpc.lite.connectrpc.conformance.v1.Compression +import com.connectrpc.lite.connectrpc.conformance.v1.HTTPVersion +import com.connectrpc.lite.connectrpc.conformance.v1.Protocol +import com.connectrpc.lite.connectrpc.conformance.v1.StreamType +import com.connectrpc.okhttp.ConnectOkHttpClient +import com.connectrpc.protocols.GETConfiguration +import com.connectrpc.protocols.NetworkProtocol +import com.google.protobuf.MessageLite +import okhttp3.OkHttpClient +import okhttp3.tls.HandshakeCertificates +import okhttp3.tls.HeldCertificate +import java.security.KeyFactory +import java.security.KeyPair +import java.security.cert.CertificateFactory +import java.security.cert.X509Certificate +import java.security.interfaces.RSAPrivateKey +import java.security.interfaces.RSAPublicKey +import java.security.spec.PKCS8EncodedKeySpec +import java.time.Duration +import java.util.Base64 +import kotlin.reflect.cast +import kotlin.reflect.safeCast + +class Client( + private val invokerFactory: (ProtocolClient) -> Invoker, + private val serializationFactory: (Codec) -> SerializationStrategy, + private val invokeStyle: UnaryClient.InvokeStyle, +) { + suspend fun handle(req: ClientCompatRequest): ClientResponseResult { + val invoker = invokerFactory(getProtocolClient(req)) + val service = req.service.orEmpty() + if (service != "connectrpc.conformance.v1.ConformanceService") { + throw RuntimeException("service $service is not known") + } + + return when (val method = req.method.orEmpty()) { + "Unary" -> handleUnary(invoker.unaryClient(), req) + // TODO: IdempotentUnary + "Unimplemented" -> handleUnary(invoker.unimplementedClient(), req) + "ClientStream" -> handleClient(invoker.clientStreamClient(), req) + "ServerStream" -> handleServer(invoker.serverStreamClient(), req) + "BidiStream" -> if (req.streamType == StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM) { + handleFullDuplexBidi(invoker.bidiStreamClient(), req) + } else { + handleHalfDuplexBidi(invoker.bidiStreamClient(), req) + } + else -> throw RuntimeException("method $method is not known") + } + } + + private suspend fun handleUnary( + client: UnaryClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleClient( + client: ClientStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleServer( + client: ServerStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleHalfDuplexBidi( + client: BidiStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleFullDuplexBidi( + client: BidiStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private fun getProtocolClient(req: ClientCompatRequest): ProtocolClient { + // TODO: cache/re-use clients instead of creating a new one for every request + val serializationStrategy = serializationFactory(req.codec) + val scheme = if (req.serverTlsCert.isEmpty) "http" else "https" + val host = "$scheme://${req.host}:${req.port}" + var clientBuilder = OkHttpClient.Builder() + .protocols(listOf(asOkHttpProtocol(req.httpVersion))) + .connectTimeout(Duration.ofMinutes(1)) + if (!req.serverTlsCert.isEmpty) { + val certs = certs(req) + clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager) + } + if (req.hasTimeoutMs()) { + clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong())) + } + + val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled + val requestCompression = + if (req.compression == Compression.COMPRESSION_GZIP) { + RequestCompression(10, GzipCompressionPool) + } else { + null + } + val compressionPools = + if (req.compression == Compression.COMPRESSION_GZIP) { + listOf(GzipCompressionPool) + } else { + emptyList() + } + return ProtocolClient( + httpClient = ConnectOkHttpClient(clientBuilder.build()), + ProtocolClientConfig( + host = host, + serializationStrategy = serializationStrategy, + networkProtocol = asNetworkProtocol(req.protocol), + getConfiguration = getConfig, + requestCompression = requestCompression, + compressionPools = compressionPools, + ), + ) + } + + private fun asNetworkProtocol(protocol: Protocol): NetworkProtocol { + return when (protocol) { + Protocol.PROTOCOL_CONNECT -> NetworkProtocol.CONNECT + Protocol.PROTOCOL_GRPC -> NetworkProtocol.GRPC + Protocol.PROTOCOL_GRPC_WEB -> NetworkProtocol.GRPC_WEB + else -> throw RuntimeException("unsupported protocol: $protocol") + } + } + + private fun asOkHttpProtocol(httpVersion: HTTPVersion): okhttp3.Protocol { + return when (httpVersion) { + HTTPVersion.HTTP_VERSION_1 -> okhttp3.Protocol.HTTP_1_1 + HTTPVersion.HTTP_VERSION_2 -> okhttp3.Protocol.HTTP_2 + else -> throw RuntimeException("unsupported HTTP version: $httpVersion") + } + } + + private fun certs(req: ClientCompatRequest): HandshakeCertificates { + val certificateAuthority = req.serverTlsCert.newInput().use { stream -> + CertificateFactory.getInstance("X.509").generateCertificate(stream) as X509Certificate + } + val result = HandshakeCertificates.Builder() + .addTrustedCertificate(certificateAuthority) + + if (!req.hasClientTlsCreds()) { + return result.build() + } + + val certificate = req.clientTlsCreds.cert.newInput().use { stream -> + CertificateFactory.getInstance("X.509").generateCertificate(stream) as X509Certificate + } + val publicKey = certificate.publicKey as RSAPublicKey + val privateKeyBytes = req.clientTlsCreds.key.newInput().bufferedReader().use { stream -> + val lines = stream.readLines().toMutableList() + // Remove BEGIN RSA PRIVATE KEY / END RSA PRIVATE KEY lines + lines.removeFirst() + lines.removeLast() + Base64.getDecoder().decode(lines.joinToString(separator = "")) + } + val privateKey = KeyFactory.getInstance("RSA") + .generatePrivate(PKCS8EncodedKeySpec(privateKeyBytes)) as RSAPrivateKey + if (publicKey.modulus != privateKey.modulus) { + throw Exception("key does not match cert") // or other error handling + } + return result + .heldCertificate(HeldCertificate(KeyPair(publicKey, privateKey), certificate)) + .build() + } + + private fun convert( + from: From, + template: To, + ): To { + val clazz = template::class + return clazz.safeCast(from) + ?: clazz.cast( + template + .newBuilderForType() + .mergeFrom(from.toByteString()) + .build(), + ) + } + + private fun convert( + from: ResponseMessage, + template: To, + ): ResponseMessage { + return when (from) { + is ResponseMessage.Success -> { + ResponseMessage.Success( + message = convert(from.message, template), + code = from.code, + headers = from.headers, + trailers = from.trailers, + ) + } + is ResponseMessage.Failure -> { + // Value does not actually contain a reference + // to response type, so we can just cast it. + @Suppress("UNCHECKED_CAST") + from as ResponseMessage + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt new file mode 100644 index 00000000..41976143 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt @@ -0,0 +1,96 @@ +package com.connectrpc.conformance.client + +import com.connectrpc.conformance.client.adapt.UnaryClient.InvokeStyle +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ClientErrorResult +import com.google.protobuf.ByteString +import kotlinx.coroutines.runBlocking +import java.io.EOFException +import java.io.InputStream +import java.io.OutputStream + +class ConformanceClientLoop { + companion object { + fun run(input: InputStream, output: OutputStream, client: Client) = runBlocking { + // TODO: issue RPCs in parallel + while (true) { + var result: ClientCompatResponse + val req = readRequest(input) ?: return@runBlocking // end of stream + try { + val resp = client.handle(req) + result = ClientCompatResponse.newBuilder().setResponse(resp).build() + } catch (e: Exception) { + val msg = if (e.message.orEmpty() == "") { + e::class.qualifiedName + } else { + "${e::class.qualifiedName}: ${e.message}" + } + result = ClientCompatResponse.newBuilder() + .setError(ClientErrorResult.newBuilder().setMessage(msg)) + .build() + } + writeResponse(output, result) + } + } + + private fun readRequest(input: InputStream): ClientCompatRequest? { + val len = input.readInt() ?: return null + val data = input.readN(len) + ?: throw EOFException("unexpected EOF: read 0 of $len expected message bytes") + return ClientCompatRequest.parseFrom(ByteString.copyFrom(data)) + } + + private fun writeResponse(output: OutputStream, resp: ClientCompatResponse) { + val len = resp.serializedSize + val prefix = ByteArray(4) + prefix[0] = len.ushr(24).toByte() + prefix[1] = len.ushr(16).toByte() + prefix[2] = len.ushr(8).toByte() + prefix[3] = len.toByte() + output.write(prefix) + resp.writeTo(output) + } + + private fun InputStream.readN(len: Int): ByteArray? { + val bytes = ByteArray(len) + var offs = 0 + var remain = len + while (remain > 0) { + val n = this.read(bytes, offs, remain) + if (n == 0) { + if (offs == 0) { + return null + } + throw EOFException("unexpected EOF: read $offs of $len expected bytes") + } + offs += n + remain -= n + } + return bytes + } + + private fun InputStream.readInt(): Int? { + val bytes = this.readN(4) ?: return null + return bytes[0].toInt().shl(24) or + bytes[1].toInt().shl(16) or + bytes[2].toInt().shl(8) or + bytes[3].toInt() + } + + fun parseArgs(args: Array): InvokeStyle { + if (args.isEmpty()) { + return InvokeStyle.SUSPEND + } + if (args.size > 1) { + throw IllegalArgumentException("expecting exactly one args (invoke style), but got ${args.size}") + } + return when (args[0].lowercase()) { + "suspend" -> InvokeStyle.SUSPEND + "blocking" -> InvokeStyle.BLOCKING + "callback" -> InvokeStyle.CALLBACK + else -> throw IllegalArgumentException("expecting one args to be 'suspend', 'blocking', or 'callback', but got '${args[0]}'") + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt new file mode 100644 index 00000000..0865269b --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt @@ -0,0 +1,65 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers +import com.google.protobuf.MessageLite + +/** + * The client of a bidi-stream RPC operation. A bidi-stream + * operation allows the client to upload zero or more request + * messages and to download zero or more response messages. + * Furthermore, bidi-stream operations can be "full duplex", + * which means that sending requests can be interleaved with + * receiving responses (whereas other stream types always + * require sending the request(s) first, and then receiving + * the response(s)). + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class BidiStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(headers: Headers): BidiStream + + /** + * A BidiStream combines a request stream and a response stream. + * + * @param Req The request message type + * @param Resp The response message type + */ + interface BidiStream { + fun requests(): RequestStream + fun responses(): ResponseStream + companion object { + fun new(underlying: BidirectionalStreamInterface): BidiStream { + val reqStream = RequestStream.new(underlying) + val respStream = ResponseStream.new(underlying) + return object : BidiStream { + override fun requests(): RequestStream { + return reqStream + } + + override fun responses(): ResponseStream { + return respStream + } + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt new file mode 100644 index 00000000..6941c1d7 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -0,0 +1,86 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.ClientOnlyStreamInterface +import com.connectrpc.Code +import com.connectrpc.ConnectException +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.google.protobuf.MessageLite + +/** + * The client of a client-stream RPC operation. A client-stream + * operation allows the client to upload zero or more request + * messages and then receive either a single response or an + * error when done. + * + * @param Req The request message type + * @param Resp The response message type + */ + +abstract class ClientStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(headers: Headers): ClientStream + + /** + * A ClientStream is just like a RequestStream, except that closing + * the stream waits for the operation result. + * + * @param Req The request message type + * @param Resp The response message type + */ + interface ClientStream { + suspend fun send(req: Req) + suspend fun closeAndReceive(): ResponseMessage + + companion object { + fun new(underlying: ClientOnlyStreamInterface): ClientStream { + return object : ClientStream { + override suspend fun send(req: Req) { + underlying.send(req) + } + + override suspend fun closeAndReceive(): ResponseMessage { + try { + val resp = underlying.receiveAndClose() + return ResponseMessage.Success( + message = resp, + code = Code.OK, + headers = underlying.responseHeaders().await(), + trailers = underlying.responseTrailers().await(), + ) + } catch (e: Exception) { + val connectException: ConnectException + if (e is ConnectException) { + connectException = e + } else { + connectException = ConnectException(code = Code.UNKNOWN, exception = e) + } + return ResponseMessage.Failure( + cause = connectException, + code = connectException.code, + headers = underlying.responseHeaders().await(), + trailers = underlying.responseTrailers().await(), + ) + } + } + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt new file mode 100644 index 00000000..08ab3419 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt @@ -0,0 +1,23 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +interface Invoker { + fun unaryClient(): UnaryClient<*, *> + fun unimplementedClient(): UnaryClient<*, *> + fun clientStreamClient(): ClientStreamClient<*, *> + fun serverStreamClient(): ServerStreamClient<*, *> + fun bidiStreamClient(): BidiStreamClient<*, *> +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt new file mode 100644 index 00000000..c9f1b343 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt @@ -0,0 +1,42 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.google.protobuf.MessageLite + +/** + * RequestStream is a stream that allows a client to upload + * zero or more request messages. When the client is done + * sending messages, it must close the stream. + */ +interface RequestStream { + suspend fun send(req: Req) + fun close() + + companion object { + fun new(underlying: BidirectionalStreamInterface): RequestStream { + return object : RequestStream { + override suspend fun send(req: Req) { + underlying.send(req) + } + + override fun close() { + underlying.sendClose() + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt new file mode 100644 index 00000000..cb401ab4 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt @@ -0,0 +1,82 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers +import com.connectrpc.ServerOnlyStreamInterface +import com.google.protobuf.MessageLite +import kotlinx.coroutines.channels.ReceiveChannel + +/** + * ResponseStream is a stream that allows a client to download + * zero or more request messages. Typically, the client should + * keep receiving messages until the end of the stream is reached. + * If the client closes the response stream before consuming all + * messages, the associated streaming RPC operation is cancelled. + * + * @param Resp The response message type + */ +interface ResponseStream { + fun messages(): ReceiveChannel + + suspend fun headers(): Headers + + suspend fun trailers(): Headers + + fun close() + + companion object { + fun new(underlying: BidirectionalStreamInterface): ResponseStream { + return object : ResponseStream { + override fun messages(): ReceiveChannel { + return underlying.responseChannel() + } + + override suspend fun headers(): Headers { + return underlying.responseHeaders().await() + } + + override suspend fun trailers(): Headers { + return underlying.responseTrailers().await() + } + + override fun close() { + underlying.receiveClose() + } + } + } + + fun new(underlying: ServerOnlyStreamInterface): ResponseStream { + return object : ResponseStream { + override fun messages(): ReceiveChannel { + return underlying.responseChannel() + } + + override suspend fun headers(): Headers { + return underlying.responseHeaders().await() + } + + override suspend fun trailers(): Headers { + return underlying.responseTrailers().await() + } + + override fun close() { + underlying.receiveClose() + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt new file mode 100644 index 00000000..621b4776 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt @@ -0,0 +1,33 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.Headers +import com.google.protobuf.MessageLite + +/** + * The client of a server-stream RPC operation. A server-stream + * operation allows the client to send a single request and then + * download zero or more response messages. + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class ServerStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(req: Req, headers: Headers): ResponseStream +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt new file mode 100644 index 00000000..d7ca2463 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt @@ -0,0 +1,83 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.http.Cancelable +import com.google.protobuf.MessageLite +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + +/** + * The client of a unary RPC operation. This provides multiple ways + * to invoke the RPC: suspend-based async, callback-based async, or + * blocking. + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class UnaryClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(req: Req, headers: Headers): ResponseMessage + + abstract fun execute(req: Req, headers: Headers, onFinish: (ResponseMessage) -> Unit): Cancelable + + abstract fun blocking(req: Req, headers: Headers): UnaryBlockingCall + + suspend fun execute( + style: InvokeStyle, + req: Req, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + when (style) { + InvokeStyle.CALLBACK -> { + return execute(req, headers, onFinish) + } + InvokeStyle.SUSPEND -> { + return coroutineScope { + val job = launch { + onFinish(execute(req, headers)) + } + return@coroutineScope { + job.cancel() + } + } + } + InvokeStyle.BLOCKING -> { + val call = blocking(req, headers) + coroutineScope { + launch(Dispatchers.IO) { + onFinish(call.execute()) + } + } + return { + call.cancel() + } + } + } + } + + enum class InvokeStyle { + CALLBACK, + SUSPEND, + BLOCKING, + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c1b81f94..41438876 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,5 +1,8 @@ rootProject.name = "connect-kotlin" +include(":conformance:client") +include(":conformance:client:google-java") +include(":conformance:client:google-javalite") include(":conformance:common") include(":conformance:google-java") include(":conformance:google-javalite")