From ea77fd50f526893cb9fa56be7eda9422277b1ac7 Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 22 Sep 2023 13:10:50 -0500 Subject: [PATCH] Add additional conformance streaming tests (#108) Add a server streaming successful test case and a ping/pong test case for verifying bidirectional streams. --- .../com/connectrpc/conformance/Conformance.kt | 83 +++++++++++++------ 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt index 888555a5..19a43b7d 100644 --- a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt +++ b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt @@ -165,6 +165,57 @@ class Conformance( unimplementedServiceClient = UnimplementedServiceClient(connectClient) } + @Test + fun serverStreaming(): Unit = runBlocking { + val sizes = listOf(512_000, 16, 2_028, 65_536) + val stream = testServiceConnectClient.streamingOutputCall() + val params = sizes.map { responseParameters { size = it } }.toList() + stream.sendAndClose( + streamingOutputCallRequest { + responseType = PayloadType.COMPRESSABLE + responseParameters += params + }, + ).getOrThrow() + val results = streamResults(stream.resultChannel()) + assertThat(results.error).isNull() + assertThat(results.code).isEqualTo(Code.OK) + assertThat(results.messages.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) + assertThat(results.messages.map { it.payload.body.size() }).isEqualTo(sizes) + } + + @Test + fun pingPong(): Unit = runBlocking { + val stream = testServiceConnectClient.fullDuplexCall() + var readHeaders = false + listOf(512_000, 16, 2_028, 65_536).forEach { + val param = responseParameters { size = it } + stream.send( + streamingOutputCallRequest { + responseType = PayloadType.COMPRESSABLE + responseParameters += param + }, + ).getOrThrow() + if (!readHeaders) { + val headersResult = stream.resultChannel().receive() + assertThat(headersResult).isInstanceOf(StreamResult.Headers::class.java) + readHeaders = true + } + val result = stream.resultChannel().receive() + assertThat(result).isInstanceOf(StreamResult.Message::class.java) + val messageResult = result as StreamResult.Message + val payload = messageResult.message.payload + assertThat(payload.type).isEqualTo(PayloadType.COMPRESSABLE) + assertThat(payload.body).hasSize(it) + } + stream.sendClose() + val results = streamResults(stream.resultChannel()) + // We've already read all the messages + assertThat(results.messages).isEmpty() + assertThat(results.error).isNull() + assertThat(results.code).isEqualTo(Code.OK) + stream.receiveClose() + } + @Test fun failServerStreaming(): Unit = runBlocking { val expectedErrorDetail = errorDetail { @@ -184,7 +235,6 @@ class Conformance( intervalUs = index * 10 } } - stream.sendAndClose( streamingOutputCallRequest { responseParameters.addAll(parameters) @@ -216,41 +266,22 @@ class Conformance( @Test fun emptyUnary(): Unit = runBlocking { - val countDownLatch = CountDownLatch(1) - testServiceConnectClient.emptyCall(empty {}) { response -> - response.failure { - fail("expected error to be null") - } - response.success { success -> - assertThat(success.message).isEqualTo(empty {}) - countDownLatch.countDown() - } - } - countDownLatch.await(500, TimeUnit.MILLISECONDS) - assertThat(countDownLatch.count).isZero() + val response = testServiceConnectClient.emptyCall(empty {}).getOrThrow() + assertThat(response).isEqualTo(empty {}) } @Test fun largeUnary(): Unit = runBlocking { val size = 314159 val message = simpleRequest { + responseType = PayloadType.COMPRESSABLE responseSize = size payload = payload { body = ByteString.copyFrom(ByteArray(size)) } } - val countDownLatch = CountDownLatch(1) - testServiceConnectClient.unaryCall(message) { response -> - response.failure { - fail("expected error to be null") - } - response.success { success -> - assertThat(success.message.payload?.body?.toByteArray()?.size).isEqualTo(size) - countDownLatch.countDown() - } - } - countDownLatch.await(500, TimeUnit.MILLISECONDS) - assertThat(countDownLatch.count).isZero() + val response = testServiceConnectClient.unaryCall(message).getOrThrow() + assertThat(response.payload.body.toByteArray()).hasSize(size) } @Test @@ -775,7 +806,7 @@ class Conformance( countDownLatch.countDown() } } - countDownLatch.await(5, TimeUnit.MINUTES) + countDownLatch.await(5, TimeUnit.SECONDS) job.cancel() assertThat(countDownLatch.count).isZero() }