Skip to content

Commit

Permalink
Add additional conformance streaming tests (#108)
Browse files Browse the repository at this point in the history
Add a server streaming successful test case and a ping/pong test case
for verifying bidirectional streams.
  • Loading branch information
pkwarren authored Sep 22, 2023
1 parent f4be592 commit ea77fd5
Showing 1 changed file with 57 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,57 @@ class Conformance(
unimplementedServiceClient = UnimplementedServiceClient(connectClient)
}

@Test
fun serverStreaming(): Unit = runBlocking {
val sizes = listOf(512_000, 16, 2_028, 65_536)
val stream = testServiceConnectClient.streamingOutputCall()
val params = sizes.map { responseParameters { size = it } }.toList()
stream.sendAndClose(
streamingOutputCallRequest {
responseType = PayloadType.COMPRESSABLE
responseParameters += params
},
).getOrThrow()
val results = streamResults(stream.resultChannel())
assertThat(results.error).isNull()
assertThat(results.code).isEqualTo(Code.OK)
assertThat(results.messages.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE))
assertThat(results.messages.map { it.payload.body.size() }).isEqualTo(sizes)
}

@Test
fun pingPong(): Unit = runBlocking {
val stream = testServiceConnectClient.fullDuplexCall()
var readHeaders = false
listOf(512_000, 16, 2_028, 65_536).forEach {
val param = responseParameters { size = it }
stream.send(
streamingOutputCallRequest {
responseType = PayloadType.COMPRESSABLE
responseParameters += param
},
).getOrThrow()
if (!readHeaders) {
val headersResult = stream.resultChannel().receive()
assertThat(headersResult).isInstanceOf(StreamResult.Headers::class.java)
readHeaders = true
}
val result = stream.resultChannel().receive()
assertThat(result).isInstanceOf(StreamResult.Message::class.java)
val messageResult = result as StreamResult.Message
val payload = messageResult.message.payload
assertThat(payload.type).isEqualTo(PayloadType.COMPRESSABLE)
assertThat(payload.body).hasSize(it)
}
stream.sendClose()
val results = streamResults(stream.resultChannel())
// We've already read all the messages
assertThat(results.messages).isEmpty()
assertThat(results.error).isNull()
assertThat(results.code).isEqualTo(Code.OK)
stream.receiveClose()
}

@Test
fun failServerStreaming(): Unit = runBlocking {
val expectedErrorDetail = errorDetail {
Expand All @@ -184,7 +235,6 @@ class Conformance(
intervalUs = index * 10
}
}

stream.sendAndClose(
streamingOutputCallRequest {
responseParameters.addAll(parameters)
Expand Down Expand Up @@ -216,41 +266,22 @@ class Conformance(

@Test
fun emptyUnary(): Unit = runBlocking {
val countDownLatch = CountDownLatch(1)
testServiceConnectClient.emptyCall(empty {}) { response ->
response.failure {
fail<Unit>("expected error to be null")
}
response.success { success ->
assertThat(success.message).isEqualTo(empty {})
countDownLatch.countDown()
}
}
countDownLatch.await(500, TimeUnit.MILLISECONDS)
assertThat(countDownLatch.count).isZero()
val response = testServiceConnectClient.emptyCall(empty {}).getOrThrow()
assertThat(response).isEqualTo(empty {})
}

@Test
fun largeUnary(): Unit = runBlocking {
val size = 314159
val message = simpleRequest {
responseType = PayloadType.COMPRESSABLE
responseSize = size
payload = payload {
body = ByteString.copyFrom(ByteArray(size))
}
}
val countDownLatch = CountDownLatch(1)
testServiceConnectClient.unaryCall(message) { response ->
response.failure {
fail<Unit>("expected error to be null")
}
response.success { success ->
assertThat(success.message.payload?.body?.toByteArray()?.size).isEqualTo(size)
countDownLatch.countDown()
}
}
countDownLatch.await(500, TimeUnit.MILLISECONDS)
assertThat(countDownLatch.count).isZero()
val response = testServiceConnectClient.unaryCall(message).getOrThrow()
assertThat(response.payload.body.toByteArray()).hasSize(size)
}

@Test
Expand Down Expand Up @@ -775,7 +806,7 @@ class Conformance(
countDownLatch.countDown()
}
}
countDownLatch.await(5, TimeUnit.MINUTES)
countDownLatch.await(5, TimeUnit.SECONDS)
job.cancel()
assertThat(countDownLatch.count).isZero()
}
Expand Down

0 comments on commit ea77fd5

Please sign in to comment.