Skip to content

Commit

Permalink
add header usage to conformance test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Dec 7, 2023
1 parent 5e5a061 commit 756c3b0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit

@RunWith(Parameterized::class)
class ConformanceTest(
protocol: NetworkProtocol,
private val protocol: NetworkProtocol,
serverType: ServerType,
) : BaseConformanceTest(protocol, serverType) {
companion object {
private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value")))
private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */)
private val requestHeaders = responseHeaders + responseTrailers
}

private lateinit var unimplementedServiceClient: UnimplementedServiceClient
private lateinit var testServiceConnectClient: TestServiceClient
Expand All @@ -68,20 +73,23 @@ class ConformanceTest(
@Test
fun serverStreaming(): Unit = runBlocking {
val sizes = listOf(512_000, 16, 2_028, 65_536)
val stream = testServiceConnectClient.streamingOutputCall()
val stream = testServiceConnectClient.streamingOutputCall(requestHeaders)
val params = sizes.map { responseParameters { size = it } }.toList()
stream.sendAndClose(
streamingOutputCallRequest {
responseType = PayloadType.COMPRESSABLE
responseParameters += params
},
).getOrThrow()
assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders)
val responses = mutableListOf<StreamingOutputCallResponse>()
for (response in stream.responseChannel()) {
responses.add(response)
}
assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE))
assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes)
assertThat(stream.responseTrailers().isCompleted).isTrue()
assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers)
}

@Test
Expand Down Expand Up @@ -673,7 +681,7 @@ class ConformanceTest(

@Test
fun clientStreaming(): Unit = runBlocking {
val stream = testServiceConnectClient.streamingInputCall(emptyMap())
val stream = testServiceConnectClient.streamingInputCall()
var sum = 0
listOf(256000, 8, 1024, 32768).forEach { size ->
stream.send(
Expand All @@ -692,6 +700,14 @@ class ConformanceTest(
try {
val response = stream.receiveAndClose()
assertThat(response.aggregatedPayloadSize).isEqualTo(sum)
assertThat(stream.responseHeaders().isCompleted).isTrue()
assertThat(stream.responseTrailers().isCompleted).isTrue()
assertThat(stream.responseHeaders().await()).isNotEmpty()
if (protocol != NetworkProtocol.CONNECT) {
// gRPC and gRPC-web communicate RPC status in trailers, so
// they should always have something.
assertThat(stream.responseTrailers().await()).isNotEmpty()
}
} finally {
countDownLatch.countDown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit

@RunWith(Parameterized::class)
class ConformanceTest(
protocol: NetworkProtocol,
private val protocol: NetworkProtocol,
serverType: ServerType,
) : BaseConformanceTest(protocol, serverType) {
companion object {
private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value")))
private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */)
private val requestHeaders = responseHeaders + responseTrailers
}

private lateinit var unimplementedServiceClient: UnimplementedServiceClient
private lateinit var testServiceConnectClient: TestServiceClient
Expand All @@ -68,20 +73,23 @@ class ConformanceTest(
@Test
fun serverStreaming(): Unit = runBlocking {
val sizes = listOf(512_000, 16, 2_028, 65_536)
val stream = testServiceConnectClient.streamingOutputCall()
val stream = testServiceConnectClient.streamingOutputCall(requestHeaders)
val params = sizes.map { responseParameters { size = it } }.toList()
stream.sendAndClose(
streamingOutputCallRequest {
responseType = PayloadType.COMPRESSABLE
responseParameters += params
},
).getOrThrow()
assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders)
val responses = mutableListOf<StreamingOutputCallResponse>()
for (response in stream.responseChannel()) {
responses.add(response)
}
assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE))
assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes)
assertThat(stream.responseTrailers().isCompleted).isTrue()
assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers)
}

@Test
Expand Down Expand Up @@ -673,7 +681,7 @@ class ConformanceTest(

@Test
fun clientStreaming(): Unit = runBlocking {
val stream = testServiceConnectClient.streamingInputCall(emptyMap())
val stream = testServiceConnectClient.streamingInputCall()
var sum = 0
listOf(256000, 8, 1024, 32768).forEach { size ->
stream.send(
Expand All @@ -692,6 +700,14 @@ class ConformanceTest(
try {
val response = stream.receiveAndClose()
assertThat(response.aggregatedPayloadSize).isEqualTo(sum)
assertThat(stream.responseHeaders().isCompleted).isTrue()
assertThat(stream.responseTrailers().isCompleted).isTrue()
assertThat(stream.responseHeaders().await()).isNotEmpty()
if (protocol != NetworkProtocol.CONNECT) {
// gRPC and gRPC-web communicate RPC status in trailers, so
// they should always have something.
assertThat(stream.responseTrailers().await()).isNotEmpty()
}
} finally {
countDownLatch.countDown()
}
Expand Down

0 comments on commit 756c3b0

Please sign in to comment.