diff --git a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt index 800ad92e..da8dd503 100644 --- a/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt +++ b/conformance/google-java/src/test/kotlin/com/connectrpc/conformance/java/ConformanceTest.kt @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit @RunWith(Parameterized::class) class ConformanceTest( - protocol: NetworkProtocol, + private val protocol: NetworkProtocol, serverType: ServerType, ) : BaseConformanceTest(protocol, serverType) { + companion object { + private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val requestHeaders = responseHeaders + responseTrailers + } private lateinit var unimplementedServiceClient: UnimplementedServiceClient private lateinit var testServiceConnectClient: TestServiceClient @@ -68,7 +73,7 @@ class ConformanceTest( @Test fun serverStreaming(): Unit = runBlocking { val sizes = listOf(512_000, 16, 2_028, 65_536) - val stream = testServiceConnectClient.streamingOutputCall() + val stream = testServiceConnectClient.streamingOutputCall(requestHeaders) val params = sizes.map { responseParameters { size = it } }.toList() stream.sendAndClose( streamingOutputCallRequest { @@ -76,12 +81,15 @@ class ConformanceTest( responseParameters += params }, ).getOrThrow() + assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders) val responses = mutableListOf() for (response in stream.responseChannel()) { responses.add(response) } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @Test @@ -673,7 +681,7 @@ class ConformanceTest( @Test fun clientStreaming(): Unit = runBlocking { - val stream = testServiceConnectClient.streamingInputCall(emptyMap()) + val stream = testServiceConnectClient.streamingInputCall() var sum = 0 listOf(256000, 8, 1024, 32768).forEach { size -> stream.send( @@ -692,6 +700,14 @@ class ConformanceTest( try { val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) + assertThat(stream.responseHeaders().isCompleted).isTrue() + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseHeaders().await()).isNotEmpty() + if (protocol != NetworkProtocol.CONNECT) { + // gRPC and gRPC-web communicate RPC status in trailers, so + // they should always have something. + assertThat(stream.responseTrailers().await()).isNotEmpty() + } } finally { countDownLatch.countDown() } diff --git a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt index 3c8e552a..e4b91fb1 100644 --- a/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt +++ b/conformance/google-javalite/src/test/kotlin/com/connectrpc/conformance/javalite/ConformanceTest.kt @@ -51,9 +51,14 @@ import java.util.concurrent.TimeUnit @RunWith(Parameterized::class) class ConformanceTest( - protocol: NetworkProtocol, + private val protocol: NetworkProtocol, serverType: ServerType, ) : BaseConformanceTest(protocol, serverType) { + companion object { + private val responseHeaders = mapOf(Pair("x-grpc-test-echo-initial", listOf("test_initial_metadata_value"))) + private val responseTrailers = mapOf(Pair("x-grpc-test-echo-trailing-bin", listOf("CgsKCwoL")), /* base64-encoded 0x0a0b0a0b0a0b */) + private val requestHeaders = responseHeaders + responseTrailers + } private lateinit var unimplementedServiceClient: UnimplementedServiceClient private lateinit var testServiceConnectClient: TestServiceClient @@ -68,7 +73,7 @@ class ConformanceTest( @Test fun serverStreaming(): Unit = runBlocking { val sizes = listOf(512_000, 16, 2_028, 65_536) - val stream = testServiceConnectClient.streamingOutputCall() + val stream = testServiceConnectClient.streamingOutputCall(requestHeaders) val params = sizes.map { responseParameters { size = it } }.toList() stream.sendAndClose( streamingOutputCallRequest { @@ -76,12 +81,15 @@ class ConformanceTest( responseParameters += params }, ).getOrThrow() + assertThat(stream.responseHeaders().await()).containsAllEntriesOf(responseHeaders) val responses = mutableListOf() for (response in stream.responseChannel()) { responses.add(response) } assertThat(responses.map { it.payload.type }.toSet()).isEqualTo(setOf(PayloadType.COMPRESSABLE)) assertThat(responses.map { it.payload.body.size() }).isEqualTo(sizes) + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseTrailers().await()).containsAllEntriesOf(responseTrailers) } @Test @@ -673,7 +681,7 @@ class ConformanceTest( @Test fun clientStreaming(): Unit = runBlocking { - val stream = testServiceConnectClient.streamingInputCall(emptyMap()) + val stream = testServiceConnectClient.streamingInputCall() var sum = 0 listOf(256000, 8, 1024, 32768).forEach { size -> stream.send( @@ -692,6 +700,14 @@ class ConformanceTest( try { val response = stream.receiveAndClose() assertThat(response.aggregatedPayloadSize).isEqualTo(sum) + assertThat(stream.responseHeaders().isCompleted).isTrue() + assertThat(stream.responseTrailers().isCompleted).isTrue() + assertThat(stream.responseHeaders().await()).isNotEmpty() + if (protocol != NetworkProtocol.CONNECT) { + // gRPC and gRPC-web communicate RPC status in trailers, so + // they should always have something. + assertThat(stream.responseTrailers().await()).isNotEmpty() + } } finally { countDownLatch.countDown() }