diff --git a/Makefile b/Makefile index 6e546674..270282c8 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,8 @@ generate: $(PROTOC) buildplugin generateconformance generateexamples ## Generate .PHONY: generateconformance generateconformance: $(PROTOC) buildplugin ## Generate protofiles for conformance tests. buf generate --template conformance/buf.gen.yaml -o conformance conformance/proto + buf generate --template conformance/client/buf.gen.yaml -o conformance/client buf.build/connectrpc/conformance:v1.0.0-rc1 + buf generate --template conformance/client/buf.gen.lite.yaml -o conformance/client buf.build/connectrpc/conformance:v1.0.0-rc1 .PHONY: generateexamples generateexamples: $(PROTOC) buildplugin ## Generate proto files for example apps. diff --git a/conformance/client/buf.gen.lite.yaml b/conformance/client/buf.gen.lite.yaml new file mode 100644 index 00000000..21df319f --- /dev/null +++ b/conformance/client/buf.gen.lite.yaml @@ -0,0 +1,20 @@ +version: v1 +managed: + enabled: true + java_package_prefix: com.connectrpc.lite +plugins: + - plugin: connect-kotlin + out: build/generated/sources/bufgen + path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin + opt: + - generateCallbackMethods=true + - generateCoroutineMethods=true + - generateBlockingUnaryMethods=true + - plugin: java + out: build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + opt: lite + - plugin: kotlin + out: build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + opt: lite diff --git a/conformance/client/buf.gen.yaml b/conformance/client/buf.gen.yaml new file mode 100644 index 00000000..51f0f04e --- /dev/null +++ b/conformance/client/buf.gen.yaml @@ -0,0 +1,17 @@ +version: v1 +managed: + enabled: true +plugins: + - plugin: connect-kotlin + out: google-java/build/generated/sources/bufgen + path: ./protoc-gen-connect-kotlin/build/install/protoc-gen-connect-kotlin/bin/protoc-gen-connect-kotlin + opt: + - generateCallbackMethods=true + - generateCoroutineMethods=true + - generateBlockingUnaryMethods=true + - plugin: java + out: google-java/build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc + - plugin: kotlin + out: google-java/build/generated/sources/bufgen + protoc_path: .tmp/bin/protoc diff --git a/conformance/client/build.gradle.kts b/conformance/client/build.gradle.kts new file mode 100644 index 00000000..21cabbc9 --- /dev/null +++ b/conformance/client/build.gradle.kts @@ -0,0 +1,30 @@ +plugins { + kotlin("jvm") +} + +// This base project contains generated code for the lite runtime +// and depends on the Google Protobuf Java Lite runtime. +// The main client logic is implemented in terms of generated +// code for that lite runtime. +// +// The non-lite runtime excludes the Google Protobuf Java Lite +// runtime and instead uses the full Java runtime. It then can +// adapt from the lite-runtime-generated code by serializing to +// bytes and then de-serializing into non-lite-generated types. + +sourceSets { + main { + java { + srcDir("build/generated/sources/bufgen") + } + } +} + +dependencies { + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.protobuf.javalite) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-java/build.gradle.kts b/conformance/client/google-java/build.gradle.kts new file mode 100644 index 00000000..844bae92 --- /dev/null +++ b/conformance/client/google-java/build.gradle.kts @@ -0,0 +1,35 @@ +plugins { + kotlin("jvm") +} + +tasks { + compileKotlin { + kotlinOptions { + // Generated Kotlin code for protobufs uses OptIn annotation + freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn" + } + } +} + +// This project contains an alternate copy of the generated +// types, generated for the non-lite runtime. +sourceSets { + main { + java { + srcDir("build/generated/sources/bufgen") + } + } +} + +dependencies { + implementation(project(":conformance:client")) { + exclude(group = "com.google.protobuf", module = "protobuf-javalite") + } + implementation(project(":extensions:google-java")) + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.protobuf.java) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt new file mode 100644 index 00000000..70512fcf --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaBidiStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.v1.BidiStreamRequest +import com.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaBidiStreamClient( + private val client: ConformanceServiceClient, +) : BidiStreamClient( + BidiStreamRequest.getDefaultInstance(), + BidiStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): BidiStream { + return BidiStream.new(client.bidiStream(headers)) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt new file mode 100644 index 00000000..b178947b --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaClientStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.v1.ClientStreamRequest +import com.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaClientStreamClient( + private val client: ConformanceServiceClient, +) : ClientStreamClient( + ClientStreamRequest.getDefaultInstance(), + ClientStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): ClientStream { + return ClientStream.new(client.clientStream(headers)) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt new file mode 100644 index 00000000..297b4c31 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaInvoker.kt @@ -0,0 +1,62 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.extensions.GoogleJavaJSONStrategy +import com.connectrpc.extensions.GoogleJavaProtobufStrategy +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.Codec + +class JavaInvoker( + protocolClient: ProtocolClient, +) : Invoker { + private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { + return JavaUnaryClient(client) + } + + override fun unimplementedClient(): UnaryClient<*, *> { + return JavaUnimplementedClient(client) + } + + override fun clientStreamClient(): ClientStreamClient<*, *> { + return JavaClientStreamClient(client) + } + + override fun serverStreamClient(): ServerStreamClient<*, *> { + return JavaServerStreamClient(client) + } + + override fun bidiStreamClient(): BidiStreamClient<*, *> { + return JavaBidiStreamClient(client) + } + + companion object { + fun serializationStrategy(codec: Codec): SerializationStrategy { + return when (codec) { + Codec.CODEC_PROTO -> GoogleJavaProtobufStrategy() + Codec.CODEC_JSON -> GoogleJavaJSONStrategy() + else -> throw RuntimeException("unsupported codec $codec") + } + } + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt new file mode 100644 index 00000000..e882a4ff --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaServerStreamClient.kt @@ -0,0 +1,35 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ResponseStream +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.ServerStreamRequest +import com.connectrpc.conformance.v1.ServerStreamResponse + +class JavaServerStreamClient( + private val client: ConformanceServiceClient, +) : ServerStreamClient( + ServerStreamRequest.getDefaultInstance(), + ServerStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream { + val stream = client.serverStream(headers) + stream.sendAndClose(req) + return ResponseStream.new(stream) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt new file mode 100644 index 00000000..f41a95a8 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnaryClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnaryRequest +import com.connectrpc.conformance.v1.UnaryResponse +import com.connectrpc.http.Cancelable + +class JavaUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnaryRequest.getDefaultInstance(), + UnaryResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnaryRequest, headers: Headers): ResponseMessage { + return client.unary(req, headers) + } + + override fun execute( + req: UnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unary(req, headers, onFinish) + } + + override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall { + return client.unaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt new file mode 100644 index 00000000..34a3981a --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaUnimplementedClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.java + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.conformance.v1.UnimplementedRequest +import com.connectrpc.conformance.v1.UnimplementedResponse +import com.connectrpc.http.Cancelable + +class JavaUnimplementedClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnimplementedRequest.getDefaultInstance(), + UnimplementedResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage { + return client.unimplemented(req, headers) + } + + override fun execute( + req: UnimplementedRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unimplemented(req, headers, onFinish) + } + + override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall { + return client.unimplementedBlocking(req, headers) + } +} diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt new file mode 100644 index 00000000..afd07c80 --- /dev/null +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/Main.kt @@ -0,0 +1,19 @@ +package com.connectrpc.conformance.client.java + +import com.connectrpc.conformance.client.Client +import com.connectrpc.conformance.client.ConformanceClientLoop + +fun main(args: Array) { + try { + val invokeStyle = ConformanceClientLoop.parseArgs(args) + val client = Client( + invokerFactory = ::JavaInvoker, + serializationFactory = JavaInvoker::serializationStrategy, + invokeStyle = invokeStyle, + ) + ConformanceClientLoop.run(System.`in`, System.out, client) + } catch (e: Exception) { + e.printStackTrace(System.err) + Runtime.getRuntime().exit(1) + } +} diff --git a/conformance/client/google-javalite/build.gradle.kts b/conformance/client/google-javalite/build.gradle.kts new file mode 100644 index 00000000..aa6cf7b9 --- /dev/null +++ b/conformance/client/google-javalite/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + kotlin("jvm") +} + +dependencies { + implementation(project(":conformance:client")) + implementation(project(":extensions:google-javalite")) + implementation(project(":okhttp")) + implementation(libs.kotlin.coroutines.core) + implementation(libs.protobuf.kotlin) + implementation(libs.okio.core) + implementation(libs.okhttp.tls) +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt new file mode 100644 index 00000000..9cf9a6bf --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteBidiStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.BidiStreamResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteBidiStreamClient( + private val client: ConformanceServiceClient, +) : BidiStreamClient( + BidiStreamRequest.getDefaultInstance(), + BidiStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): BidiStream { + return BidiStream.new(client.bidiStream(headers)) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt new file mode 100644 index 00000000..61c8d048 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteClientStreamClient.kt @@ -0,0 +1,32 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientStreamResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteClientStreamClient( + private val client: ConformanceServiceClient, +) : ClientStreamClient( + ClientStreamRequest.getDefaultInstance(), + ClientStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(headers: Headers): ClientStream { + return ClientStream.new(client.clientStream(headers)) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt new file mode 100644 index 00000000..e5d758f2 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteInvoker.kt @@ -0,0 +1,61 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.SerializationStrategy +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.extensions.GoogleJavaLiteProtobufStrategy +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.Codec +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient + +class JavaLiteInvoker( + protocolClient: ProtocolClient, +) : Invoker { + private val client = ConformanceServiceClient(protocolClient) + override fun unaryClient(): UnaryClient<*, *> { + return JavaLiteUnaryClient(client) + } + + override fun unimplementedClient(): UnaryClient<*, *> { + return JavaLiteUnimplementedClient(client) + } + + override fun clientStreamClient(): ClientStreamClient<*, *> { + return JavaLiteClientStreamClient(client) + } + + override fun serverStreamClient(): ServerStreamClient<*, *> { + return JavaLiteServerStreamClient(client) + } + + override fun bidiStreamClient(): BidiStreamClient<*, *> { + return JavaLiteBidiStreamClient(client) + } + + companion object { + fun serializationStrategy(codec: Codec): SerializationStrategy { + return when (codec) { + Codec.CODEC_PROTO -> GoogleJavaLiteProtobufStrategy() + Codec.CODEC_JSON -> throw RuntimeException("Java Lite does not support JSON") + else -> throw RuntimeException("unsupported codec $codec") + } + } + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt new file mode 100644 index 00000000..a845dab2 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteServerStreamClient.kt @@ -0,0 +1,35 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.conformance.client.adapt.ResponseStream +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ServerStreamResponse + +class JavaLiteServerStreamClient( + private val client: ConformanceServiceClient, +) : ServerStreamClient( + ServerStreamRequest.getDefaultInstance(), + ServerStreamResponse.getDefaultInstance(), +) { + override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream { + val stream = client.serverStream(headers) + stream.sendAndClose(req) + return ResponseStream.new(stream) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt new file mode 100644 index 00000000..a919a446 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnaryClient.kt @@ -0,0 +1,50 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.http.Cancelable +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.UnaryRequest +import com.connectrpc.lite.connectrpc.conformance.v1.UnaryResponse + +class JavaLiteUnaryClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnaryRequest.getDefaultInstance(), + UnaryResponse.getDefaultInstance(), +) { + override suspend fun execute( + req: UnaryRequest, + headers: Headers, + ): ResponseMessage { + return client.unary(req, headers) + } + + override fun execute( + req: UnaryRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unary(req, headers, onFinish) + } + + override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall { + return client.unaryBlocking(req, headers) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt new file mode 100644 index 00000000..a2ba8f11 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteUnimplementedClient.kt @@ -0,0 +1,47 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.http.Cancelable +import com.connectrpc.lite.connectrpc.conformance.v1.ConformanceServiceClient +import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedRequest +import com.connectrpc.lite.connectrpc.conformance.v1.UnimplementedResponse + +class JavaLiteUnimplementedClient( + private val client: ConformanceServiceClient, +) : UnaryClient( + UnimplementedRequest.getDefaultInstance(), + UnimplementedResponse.getDefaultInstance(), +) { + override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage { + return client.unimplemented(req, headers) + } + + override fun execute( + req: UnimplementedRequest, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + return client.unimplemented(req, headers, onFinish) + } + + override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall { + return client.unimplementedBlocking(req, headers) + } +} diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt new file mode 100644 index 00000000..16cce508 --- /dev/null +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/Main.kt @@ -0,0 +1,19 @@ +package com.connectrpc.conformance.client.javalite + +import com.connectrpc.conformance.client.Client +import com.connectrpc.conformance.client.ConformanceClientLoop + +fun main(args: Array) { + try { + val invokeStyle = ConformanceClientLoop.parseArgs(args) + val client = Client( + invokerFactory = ::JavaLiteInvoker, + serializationFactory = JavaLiteInvoker::serializationStrategy, + invokeStyle = invokeStyle, + ) + ConformanceClientLoop.run(System.`in`, System.out, client) + } catch (e: Exception) { + e.printStackTrace(System.err) + Runtime.getRuntime().exit(1) + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt new file mode 100644 index 00000000..ccfd8214 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -0,0 +1,228 @@ +package com.connectrpc.conformance.client + +import com.connectrpc.ProtocolClientConfig +import com.connectrpc.RequestCompression +import com.connectrpc.ResponseMessage +import com.connectrpc.SerializationStrategy +import com.connectrpc.compression.GzipCompressionPool +import com.connectrpc.conformance.client.adapt.BidiStreamClient +import com.connectrpc.conformance.client.adapt.ClientStreamClient +import com.connectrpc.conformance.client.adapt.Invoker +import com.connectrpc.conformance.client.adapt.ServerStreamClient +import com.connectrpc.conformance.client.adapt.UnaryClient +import com.connectrpc.impl.ProtocolClient +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientResponseResult +import com.connectrpc.lite.connectrpc.conformance.v1.Codec +import com.connectrpc.lite.connectrpc.conformance.v1.Compression +import com.connectrpc.lite.connectrpc.conformance.v1.HTTPVersion +import com.connectrpc.lite.connectrpc.conformance.v1.Protocol +import com.connectrpc.lite.connectrpc.conformance.v1.StreamType +import com.connectrpc.okhttp.ConnectOkHttpClient +import com.connectrpc.protocols.GETConfiguration +import com.connectrpc.protocols.NetworkProtocol +import com.google.protobuf.MessageLite +import okhttp3.OkHttpClient +import okhttp3.tls.HandshakeCertificates +import okhttp3.tls.HeldCertificate +import java.security.KeyFactory +import java.security.KeyPair +import java.security.cert.CertificateFactory +import java.security.cert.X509Certificate +import java.security.interfaces.RSAPrivateKey +import java.security.interfaces.RSAPublicKey +import java.security.spec.PKCS8EncodedKeySpec +import java.time.Duration +import java.util.Base64 +import kotlin.reflect.cast +import kotlin.reflect.safeCast + +class Client( + private val invokerFactory: (ProtocolClient) -> Invoker, + private val serializationFactory: (Codec) -> SerializationStrategy, + private val invokeStyle: UnaryClient.InvokeStyle, +) { + suspend fun handle(req: ClientCompatRequest): ClientResponseResult { + val invoker = invokerFactory(getProtocolClient(req)) + val service = req.service.orEmpty() + if (service != "connectrpc.conformance.v1.ConformanceService") { + throw RuntimeException("service $service is not known") + } + + return when (val method = req.method.orEmpty()) { + "Unary" -> handleUnary(invoker.unaryClient(), req) + // TODO: IdempotentUnary + "Unimplemented" -> handleUnary(invoker.unimplementedClient(), req) + "ClientStream" -> handleClient(invoker.clientStreamClient(), req) + "ServerStream" -> handleServer(invoker.serverStreamClient(), req) + "BidiStream" -> if (req.streamType == StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM) { + handleFullDuplexBidi(invoker.bidiStreamClient(), req) + } else { + handleHalfDuplexBidi(invoker.bidiStreamClient(), req) + } + else -> throw RuntimeException("method $method is not known") + } + } + + private suspend fun handleUnary( + client: UnaryClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleClient( + client: ClientStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleServer( + client: ServerStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleHalfDuplexBidi( + client: BidiStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private suspend fun handleFullDuplexBidi( + client: BidiStreamClient, + req: ClientCompatRequest, + ): ClientResponseResult { + TODO("implement me") + } + + private fun getProtocolClient(req: ClientCompatRequest): ProtocolClient { + // TODO: cache/re-use clients instead of creating a new one for every request + val serializationStrategy = serializationFactory(req.codec) + val scheme = if (req.serverTlsCert.isEmpty) "http" else "https" + val host = "$scheme://${req.host}:${req.port}" + var clientBuilder = OkHttpClient.Builder() + .protocols(listOf(asOkHttpProtocol(req.httpVersion))) + .connectTimeout(Duration.ofMinutes(1)) + if (!req.serverTlsCert.isEmpty) { + val certs = certs(req) + clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager) + } + if (req.hasTimeoutMs()) { + clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong())) + } + + val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled + val requestCompression = + if (req.compression == Compression.COMPRESSION_GZIP) { + RequestCompression(10, GzipCompressionPool) + } else { + null + } + val compressionPools = + if (req.compression == Compression.COMPRESSION_GZIP) { + listOf(GzipCompressionPool) + } else { + emptyList() + } + return ProtocolClient( + httpClient = ConnectOkHttpClient(clientBuilder.build()), + ProtocolClientConfig( + host = host, + serializationStrategy = serializationStrategy, + networkProtocol = asNetworkProtocol(req.protocol), + getConfiguration = getConfig, + requestCompression = requestCompression, + compressionPools = compressionPools, + ), + ) + } + + private fun asNetworkProtocol(protocol: Protocol): NetworkProtocol { + return when (protocol) { + Protocol.PROTOCOL_CONNECT -> NetworkProtocol.CONNECT + Protocol.PROTOCOL_GRPC -> NetworkProtocol.GRPC + Protocol.PROTOCOL_GRPC_WEB -> NetworkProtocol.GRPC_WEB + else -> throw RuntimeException("unsupported protocol: $protocol") + } + } + + private fun asOkHttpProtocol(httpVersion: HTTPVersion): okhttp3.Protocol { + return when (httpVersion) { + HTTPVersion.HTTP_VERSION_1 -> okhttp3.Protocol.HTTP_1_1 + HTTPVersion.HTTP_VERSION_2 -> okhttp3.Protocol.HTTP_2 + else -> throw RuntimeException("unsupported HTTP version: $httpVersion") + } + } + + private fun certs(req: ClientCompatRequest): HandshakeCertificates { + val certificateAuthority = req.serverTlsCert.newInput().use { stream -> + CertificateFactory.getInstance("X.509").generateCertificate(stream) as X509Certificate + } + val result = HandshakeCertificates.Builder() + .addTrustedCertificate(certificateAuthority) + + if (!req.hasClientTlsCreds()) { + return result.build() + } + + val certificate = req.clientTlsCreds.cert.newInput().use { stream -> + CertificateFactory.getInstance("X.509").generateCertificate(stream) as X509Certificate + } + val publicKey = certificate.publicKey as RSAPublicKey + val privateKeyBytes = req.clientTlsCreds.key.newInput().bufferedReader().use { stream -> + val lines = stream.readLines().toMutableList() + // Remove BEGIN RSA PRIVATE KEY / END RSA PRIVATE KEY lines + lines.removeFirst() + lines.removeLast() + Base64.getDecoder().decode(lines.joinToString(separator = "")) + } + val privateKey = KeyFactory.getInstance("RSA") + .generatePrivate(PKCS8EncodedKeySpec(privateKeyBytes)) as RSAPrivateKey + if (publicKey.modulus != privateKey.modulus) { + throw Exception("key does not match cert") // or other error handling + } + return result + .heldCertificate(HeldCertificate(KeyPair(publicKey, privateKey), certificate)) + .build() + } + + private fun convert( + from: From, + template: To, + ): To { + val clazz = template::class + return clazz.safeCast(from) + ?: clazz.cast( + template + .newBuilderForType() + .mergeFrom(from.toByteString()) + .build(), + ) + } + + private fun convert( + from: ResponseMessage, + template: To, + ): ResponseMessage { + return when (from) { + is ResponseMessage.Success -> { + ResponseMessage.Success( + message = convert(from.message, template), + code = from.code, + headers = from.headers, + trailers = from.trailers, + ) + } + is ResponseMessage.Failure -> { + // Value does not actually contain a reference + // to response type, so we can just cast it. + @Suppress("UNCHECKED_CAST") + from as ResponseMessage + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt new file mode 100644 index 00000000..41976143 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/ConformanceClientLoop.kt @@ -0,0 +1,96 @@ +package com.connectrpc.conformance.client + +import com.connectrpc.conformance.client.adapt.UnaryClient.InvokeStyle +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatRequest +import com.connectrpc.lite.connectrpc.conformance.v1.ClientCompatResponse +import com.connectrpc.lite.connectrpc.conformance.v1.ClientErrorResult +import com.google.protobuf.ByteString +import kotlinx.coroutines.runBlocking +import java.io.EOFException +import java.io.InputStream +import java.io.OutputStream + +class ConformanceClientLoop { + companion object { + fun run(input: InputStream, output: OutputStream, client: Client) = runBlocking { + // TODO: issue RPCs in parallel + while (true) { + var result: ClientCompatResponse + val req = readRequest(input) ?: return@runBlocking // end of stream + try { + val resp = client.handle(req) + result = ClientCompatResponse.newBuilder().setResponse(resp).build() + } catch (e: Exception) { + val msg = if (e.message.orEmpty() == "") { + e::class.qualifiedName + } else { + "${e::class.qualifiedName}: ${e.message}" + } + result = ClientCompatResponse.newBuilder() + .setError(ClientErrorResult.newBuilder().setMessage(msg)) + .build() + } + writeResponse(output, result) + } + } + + private fun readRequest(input: InputStream): ClientCompatRequest? { + val len = input.readInt() ?: return null + val data = input.readN(len) + ?: throw EOFException("unexpected EOF: read 0 of $len expected message bytes") + return ClientCompatRequest.parseFrom(ByteString.copyFrom(data)) + } + + private fun writeResponse(output: OutputStream, resp: ClientCompatResponse) { + val len = resp.serializedSize + val prefix = ByteArray(4) + prefix[0] = len.ushr(24).toByte() + prefix[1] = len.ushr(16).toByte() + prefix[2] = len.ushr(8).toByte() + prefix[3] = len.toByte() + output.write(prefix) + resp.writeTo(output) + } + + private fun InputStream.readN(len: Int): ByteArray? { + val bytes = ByteArray(len) + var offs = 0 + var remain = len + while (remain > 0) { + val n = this.read(bytes, offs, remain) + if (n == 0) { + if (offs == 0) { + return null + } + throw EOFException("unexpected EOF: read $offs of $len expected bytes") + } + offs += n + remain -= n + } + return bytes + } + + private fun InputStream.readInt(): Int? { + val bytes = this.readN(4) ?: return null + return bytes[0].toInt().shl(24) or + bytes[1].toInt().shl(16) or + bytes[2].toInt().shl(8) or + bytes[3].toInt() + } + + fun parseArgs(args: Array): InvokeStyle { + if (args.isEmpty()) { + return InvokeStyle.SUSPEND + } + if (args.size > 1) { + throw IllegalArgumentException("expecting exactly one args (invoke style), but got ${args.size}") + } + return when (args[0].lowercase()) { + "suspend" -> InvokeStyle.SUSPEND + "blocking" -> InvokeStyle.BLOCKING + "callback" -> InvokeStyle.CALLBACK + else -> throw IllegalArgumentException("expecting one args to be 'suspend', 'blocking', or 'callback', but got '${args[0]}'") + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt new file mode 100644 index 00000000..0865269b --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/BidiStreamClient.kt @@ -0,0 +1,65 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers +import com.google.protobuf.MessageLite + +/** + * The client of a bidi-stream RPC operation. A bidi-stream + * operation allows the client to upload zero or more request + * messages and to download zero or more response messages. + * Furthermore, bidi-stream operations can be "full duplex", + * which means that sending requests can be interleaved with + * receiving responses (whereas other stream types always + * require sending the request(s) first, and then receiving + * the response(s)). + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class BidiStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(headers: Headers): BidiStream + + /** + * A BidiStream combines a request stream and a response stream. + * + * @param Req The request message type + * @param Resp The response message type + */ + interface BidiStream { + fun requests(): RequestStream + fun responses(): ResponseStream + companion object { + fun new(underlying: BidirectionalStreamInterface): BidiStream { + val reqStream = RequestStream.new(underlying) + val respStream = ResponseStream.new(underlying) + return object : BidiStream { + override fun requests(): RequestStream { + return reqStream + } + + override fun responses(): ResponseStream { + return respStream + } + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt new file mode 100644 index 00000000..6941c1d7 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -0,0 +1,86 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.ClientOnlyStreamInterface +import com.connectrpc.Code +import com.connectrpc.ConnectException +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.google.protobuf.MessageLite + +/** + * The client of a client-stream RPC operation. A client-stream + * operation allows the client to upload zero or more request + * messages and then receive either a single response or an + * error when done. + * + * @param Req The request message type + * @param Resp The response message type + */ + +abstract class ClientStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(headers: Headers): ClientStream + + /** + * A ClientStream is just like a RequestStream, except that closing + * the stream waits for the operation result. + * + * @param Req The request message type + * @param Resp The response message type + */ + interface ClientStream { + suspend fun send(req: Req) + suspend fun closeAndReceive(): ResponseMessage + + companion object { + fun new(underlying: ClientOnlyStreamInterface): ClientStream { + return object : ClientStream { + override suspend fun send(req: Req) { + underlying.send(req) + } + + override suspend fun closeAndReceive(): ResponseMessage { + try { + val resp = underlying.receiveAndClose() + return ResponseMessage.Success( + message = resp, + code = Code.OK, + headers = underlying.responseHeaders().await(), + trailers = underlying.responseTrailers().await(), + ) + } catch (e: Exception) { + val connectException: ConnectException + if (e is ConnectException) { + connectException = e + } else { + connectException = ConnectException(code = Code.UNKNOWN, exception = e) + } + return ResponseMessage.Failure( + cause = connectException, + code = connectException.code, + headers = underlying.responseHeaders().await(), + trailers = underlying.responseTrailers().await(), + ) + } + } + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt new file mode 100644 index 00000000..08ab3419 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/Invoker.kt @@ -0,0 +1,23 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +interface Invoker { + fun unaryClient(): UnaryClient<*, *> + fun unimplementedClient(): UnaryClient<*, *> + fun clientStreamClient(): ClientStreamClient<*, *> + fun serverStreamClient(): ServerStreamClient<*, *> + fun bidiStreamClient(): BidiStreamClient<*, *> +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt new file mode 100644 index 00000000..c9f1b343 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/RequestStream.kt @@ -0,0 +1,42 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.google.protobuf.MessageLite + +/** + * RequestStream is a stream that allows a client to upload + * zero or more request messages. When the client is done + * sending messages, it must close the stream. + */ +interface RequestStream { + suspend fun send(req: Req) + fun close() + + companion object { + fun new(underlying: BidirectionalStreamInterface): RequestStream { + return object : RequestStream { + override suspend fun send(req: Req) { + underlying.send(req) + } + + override fun close() { + underlying.sendClose() + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt new file mode 100644 index 00000000..cb401ab4 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ResponseStream.kt @@ -0,0 +1,82 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.BidirectionalStreamInterface +import com.connectrpc.Headers +import com.connectrpc.ServerOnlyStreamInterface +import com.google.protobuf.MessageLite +import kotlinx.coroutines.channels.ReceiveChannel + +/** + * ResponseStream is a stream that allows a client to download + * zero or more request messages. Typically, the client should + * keep receiving messages until the end of the stream is reached. + * If the client closes the response stream before consuming all + * messages, the associated streaming RPC operation is cancelled. + * + * @param Resp The response message type + */ +interface ResponseStream { + fun messages(): ReceiveChannel + + suspend fun headers(): Headers + + suspend fun trailers(): Headers + + fun close() + + companion object { + fun new(underlying: BidirectionalStreamInterface): ResponseStream { + return object : ResponseStream { + override fun messages(): ReceiveChannel { + return underlying.responseChannel() + } + + override suspend fun headers(): Headers { + return underlying.responseHeaders().await() + } + + override suspend fun trailers(): Headers { + return underlying.responseTrailers().await() + } + + override fun close() { + underlying.receiveClose() + } + } + } + + fun new(underlying: ServerOnlyStreamInterface): ResponseStream { + return object : ResponseStream { + override fun messages(): ReceiveChannel { + return underlying.responseChannel() + } + + override suspend fun headers(): Headers { + return underlying.responseHeaders().await() + } + + override suspend fun trailers(): Headers { + return underlying.responseTrailers().await() + } + + override fun close() { + underlying.receiveClose() + } + } + } + } +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt new file mode 100644 index 00000000..621b4776 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ServerStreamClient.kt @@ -0,0 +1,33 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.Headers +import com.google.protobuf.MessageLite + +/** + * The client of a server-stream RPC operation. A server-stream + * operation allows the client to send a single request and then + * download zero or more response messages. + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class ServerStreamClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(req: Req, headers: Headers): ResponseStream +} diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt new file mode 100644 index 00000000..d7ca2463 --- /dev/null +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/UnaryClient.kt @@ -0,0 +1,83 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.conformance.client.adapt + +import com.connectrpc.Headers +import com.connectrpc.ResponseMessage +import com.connectrpc.UnaryBlockingCall +import com.connectrpc.http.Cancelable +import com.google.protobuf.MessageLite +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + +/** + * The client of a unary RPC operation. This provides multiple ways + * to invoke the RPC: suspend-based async, callback-based async, or + * blocking. + * + * @param Req The request message type + * @param Resp The response message type + */ +abstract class UnaryClient( + val reqTemplate: Req, + val respTemplate: Resp, +) { + abstract suspend fun execute(req: Req, headers: Headers): ResponseMessage + + abstract fun execute(req: Req, headers: Headers, onFinish: (ResponseMessage) -> Unit): Cancelable + + abstract fun blocking(req: Req, headers: Headers): UnaryBlockingCall + + suspend fun execute( + style: InvokeStyle, + req: Req, + headers: Headers, + onFinish: (ResponseMessage) -> Unit, + ): Cancelable { + when (style) { + InvokeStyle.CALLBACK -> { + return execute(req, headers, onFinish) + } + InvokeStyle.SUSPEND -> { + return coroutineScope { + val job = launch { + onFinish(execute(req, headers)) + } + return@coroutineScope { + job.cancel() + } + } + } + InvokeStyle.BLOCKING -> { + val call = blocking(req, headers) + coroutineScope { + launch(Dispatchers.IO) { + onFinish(call.execute()) + } + } + return { + call.cancel() + } + } + } + } + + enum class InvokeStyle { + CALLBACK, + SUSPEND, + BLOCKING, + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c1b81f94..41438876 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,5 +1,8 @@ rootProject.name = "connect-kotlin" +include(":conformance:client") +include(":conformance:client:google-java") +include(":conformance:client:google-javalite") include(":conformance:common") include(":conformance:google-java") include(":conformance:google-javalite")