Skip to content

Commit

Permalink
Enhance verbose output of conformance client and enable stream confor…
Browse files Browse the repository at this point in the history
…mance tests (#212)

The big change here is that the stream conformance test cases now
run in CI (and thus actually work!).

This also adds verbose output features to the conformance client. There are
now 5 levels of verbosity.
1. Prints the start and end of each test case. If an error occurs,
prints the exception's stack trace.
2. Prints exception stack traces for all RPC errors encountered and any
errors that occur when calling `send`. (These could be expected errors,
for tests of error conditions, which is why they are not enabled at
verbosity 1).
3. Prints the full messages read from stdin and results written to
stdout (in protobuf text format). Also adds a tracing decorator to the
`HTTPClientInterface` implementation, to print out each step in an RPC.
4. Adds an okhttp event logger, to trace each step in the underlying
HTTP framework.
5. Turns on stack-trace logging for the above okhttp event logger and
`HTTPClientInterface` tracing, so that each step shows its full stack
trace (useful since some operations can happen from different threads
and control flow and source of events can be non-obvious).
  • Loading branch information
jhump authored Jan 31, 2024
1 parent 3204d07 commit 328110c
Show file tree
Hide file tree
Showing 19 changed files with 409 additions and 53 deletions.
16 changes: 6 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,12 @@ runconformancenew: generate $(CONNECT_CONFORMANCE) ## Run the new conformance te
--known-failing conformance/client/known-failing-unary-cases.txt -- \
conformance/client/google-java/build/install/google-java/bin/google-java \
--style blocking

# TODO: Add streaming conformance tests. Currently, a small number of the test cases
# are flaky, so leaving this commented out for now.
# (Will continue investigating and address soon).
# $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-stream-config.yaml \
# --known-failing conformance/client/known-failing-stream-cases.txt -- \
# conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite
# $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-stream-config.yaml \
# --known-failing conformance/client/known-failing-stream-cases.txt -- \
# conformance/client/google-java/build/install/google-java/bin/google-java
$(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-stream-config.yaml \
--known-failing conformance/client/known-failing-stream-cases.txt -- \
conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite
$(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-stream-config.yaml \
--known-failing conformance/client/known-failing-stream-cases.txt -- \
conformance/client/google-java/build/install/google-java/bin/google-java

.PHONY: runcrosstests
runcrosstests: generate ## Run the old cross-test suite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ class JavaHelpers {
if (err != null) {
respBuilder.setError(toProtoError(err))
}
builder.setResponse(respBuilder)
val respMsg = respBuilder.build()
result.response.raw = respMsg
builder.setResponse(respMsg)
}
is ClientCompatResponse.Result.ErrorResult -> {
builder.setError(
Expand Down Expand Up @@ -166,6 +168,8 @@ class JavaHelpers {
private class ClientCompatRequestImpl(
private val msg: com.connectrpc.conformance.v1.ClientCompatRequest,
) : ClientCompatRequest {
override val raw: kotlin.Any
get() = msg
override val testName: String
get() = msg.testName
override val service: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fun main(args: Array<String>) {
val loop = ConformanceClientLoop(
JavaHelpers::unmarshalRequest,
JavaHelpers::marshalResponse,
clientArgs.verbosity,
clientArgs.verbose,
)
val client = Client(
args = clientArgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ class JavaLiteHelpers {
if (err != null) {
respBuilder.setError(toProtoError(err))
}
builder.setResponse(respBuilder)
val respMsg = respBuilder.build()
result.response.raw = respMsg
builder.setResponse(respMsg)
}
is ClientCompatResponse.Result.ErrorResult -> {
builder.setError(
Expand Down Expand Up @@ -148,6 +150,8 @@ class JavaLiteHelpers {
private class ClientCompatRequestImpl(
private val msg: com.connectrpc.conformance.v1.ClientCompatRequest,
) : ClientCompatRequest {
override val raw: kotlin.Any
get() = msg
override val testName: String
get() = msg.testName
override val service: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fun main(args: Array<String>) {
val loop = ConformanceClientLoop(
JavaLiteHelpers::unmarshalRequest,
JavaLiteHelpers::marshalResponse,
clientArgs.verbosity,
clientArgs.verbose,
)
val client = Client(
args = clientArgs,
Expand Down
17 changes: 4 additions & 13 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
# OkHttp seems to have a bug where timeout is not properly
# enforced when request body is full-duplex.
# We currently rely on OkHttp's "call timeout" to handle
# RPC deadlines, but that is not enforced when the request
# body is duplex. So timeouts don't currently work with
# bidi streams.
Timeouts/HTTPVersion:2/**/bidi half duplex timeout
Timeouts/HTTPVersion:2/**/bidi full duplex timeout

# Connect-kotlin does not have a way to limit the size of messages
# received. It probably should. Despite this, many cases in this suite
# still pass, so they are likely not exercising what we think they are.
# TODO: add flag to config yaml for whether implementation supports
# a receive size limit
Client Message Size/**/Compression:COMPRESSION_GZIP/TLS:false/**/client stream first request exceeds client limit
Client Message Size/**/Compression:COMPRESSION_GZIP/TLS:false/**/client stream subsequent request exceeds client limit
Client Message Size/**/Compression:COMPRESSION_GZIP/TLS:false/**/client stream all requests equal to client limit
Client Message Size/**/Compression:COMPRESSION_GZIP/TLS:false/**/server stream request equal to client limit
Client Message Size/**/Compression:COMPRESSION_GZIP/TLS:false/**/server stream request exceeds client limit
1 change: 1 addition & 0 deletions conformance/client/lite-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ features:
- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
# TODO: get client certs working and uncomment this
#supportsTlsClientCerts: true
supportsMessageReceiveLimit: false
1 change: 1 addition & 0 deletions conformance/client/lite-unary-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ features:
- STREAM_TYPE_UNARY
# TODO: get client certs working and uncomment this
#supportsTlsClientCerts: true
supportsMessageReceiveLimit: false
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.connectrpc.conformance.client.adapt.Invoker
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.client.adapt.UnaryClient
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.impl.ProtocolClient
import com.connectrpc.okhttp.ConnectOkHttpClient
import com.connectrpc.protocols.GETConfiguration
Expand All @@ -45,6 +46,7 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.tls.HandshakeCertificates
import okhttp3.tls.HeldCertificate
import java.security.KeyFactory
Expand Down Expand Up @@ -172,6 +174,10 @@ class Client(
try {
stream.send(msg)
} catch (ex: Exception) {
args.verbose.verbosity(2) {
println("Failed to send request message:")
indent().println(ex.stackTraceToString())
}
numUnsent = req.requestMessages.size - i
break
}
Expand Down Expand Up @@ -277,6 +283,10 @@ class Client(
try {
stream.requests.send(msg)
} catch (ex: Exception) {
args.verbose.verbosity(2) {
println("Failed to send request message:")
indent().println(ex.stackTraceToString())
}
numUnsent = req.requestMessages.size - i
break
}
Expand Down Expand Up @@ -318,6 +328,10 @@ class Client(
try {
stream.requests.send(msg)
} catch (ex: Exception) {
args.verbose.verbosity(2) {
println("Failed to send request message:")
indent().println(ex.stackTraceToString())
}
// Ignore. We should see it again below when we receive the response.
}

Expand Down Expand Up @@ -447,6 +461,11 @@ class Client(
var clientBuilder = OkHttpClient.Builder()
.protocols(asOkHttpProtocols(req.httpVersion, useTls))
.connectTimeout(Duration.ofMinutes(1))

args.verbose.withPrefix("okhttp3 events: ").verbosity(4) {
clientBuilder = clientBuilder.eventListener(OkHttpEventTracer(this))
}

if (useTls) {
val certs = certs(req)
clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager)
Expand All @@ -469,10 +488,14 @@ class Client(
emptyList()
}
val httpClient = clientBuilder.build()
var connectHttpClient: HTTPClientInterface = ConnectOkHttpClient(httpClient)
args.verbose.withPrefix("http client interface: ").verbosity(3) {
connectHttpClient = TracingHTTPClient(connectHttpClient, this)
}
return Pair(
httpClient,
ProtocolClient(
httpClient = ConnectOkHttpClient(httpClient),
httpClient = connectHttpClient,
ProtocolClientConfig(
host = host,
serializationStrategy = serializationStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.connectrpc.conformance.client.adapt.UnaryClient.InvokeStyle

data class ClientArgs(
val invokeStyle: InvokeStyle,
val verbosity: Int,
val verbose: VerbosePrinter,
) {
companion object {
fun parseArgs(args: Array<String>): ClientArgs {
Expand Down Expand Up @@ -53,13 +53,16 @@ data class ClientArgs(
}
}
"-v" -> {
verbosity = 1
}
"-vv" -> {
verbosity = 2
}
"-vvv" -> {
verbosity = 3
if (i == args.size - 1) {
throw RuntimeException("$arg option requires a value")
}
skip = true // consuming next string now
val v = args[i + 1]
val intVal = v.toIntOrNull()
if (intVal == null || intVal < 1 || intVal > 5) {
throw RuntimeException("value for $arg option should be an integer between 1 and 5; instead got '$v'")
}
verbosity = intVal
}
else -> {
if (arg.startsWith("-")) {
Expand All @@ -70,7 +73,7 @@ data class ClientArgs(
}
}
}
return ClientArgs(invokeStyle, verbosity)
return ClientArgs(invokeStyle, VerbosePrinter(verbosity, "* client: "))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,43 @@ import java.io.OutputStream
class ConformanceClientLoop(
private val requestUnmarshaller: (ByteArray) -> ClientCompatRequest,
private val responseMarshaller: (ClientCompatResponse) -> ByteArray,
private val verbosity: Int = 0,
private val verbose: VerbosePrinter,
) {
fun run(input: InputStream, output: OutputStream, client: Client) = runBlocking {
// TODO: issue RPCs in parallel
while (true) {
var result: ClientCompatResponse.Result
val req = readRequest(input) ?: return@runBlocking // end of stream
if (verbosity > 0) {
System.err.println("* client: read request for test ${req.testName}")
verbose.verbosity(1) {
println("read request for test ${req.testName}")
verbose.verbosity(3) {
println("RPC request:")
indent().println("${req.raw}")
}
}
try {
val resp = client.handle(req)
result = ClientCompatResponse.Result.ResponseResult(resp)
if (verbosity > 0) {
System.err.println("* client: RPC completed for test ${req.testName}")
verbose.verbosity(1) {
println("RPC completed for test ${req.testName}")
}
} catch (e: Exception) {
if (verbosity > 0) {
System.err.println("* client: RPC could not be issued for test ${req.testName}")
e.printStackTrace()
} catch (ex: Exception) {
verbose.verbosity(1) {
println("RPC could not be issued for test ${req.testName}")
indent().println(ex.stackTraceToString())
}
val msg = if (e.message.orEmpty() == "") {
e::class.qualifiedName.orEmpty()
val msg = if (ex.message.orEmpty() == "") {
ex::class.qualifiedName.orEmpty()
} else {
"${e::class.qualifiedName}: ${e.message}"
"${ex::class.qualifiedName}: ${ex.message}"
}
result = ClientCompatResponse.Result.ErrorResult(msg)
}
if (result is ClientCompatResponse.Result.ResponseResult && result.response.error != null) {
if (verbosity > 2) {
verbose.verbosity(2) {
val ex = result.response.error!!
System.err.println("* client: RPC failed with code ${ex.code}")
ex.printStackTrace()
println("RPC failed with code ${ex.code} for test ${req.testName}:")
indent().println(ex.stackTraceToString())
}
}
writeResponse(
Expand All @@ -72,6 +76,12 @@ class ConformanceClientLoop(
result = result,
),
)
if (result is ClientCompatResponse.Result.ResponseResult && result.response.raw != null) {
verbose.verbosity(3) {
println("RPC result:")
indent().println("${result.response.raw}")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2022-2023 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.connectrpc.conformance.client

import com.connectrpc.okhttp.originalCode
import okhttp3.Call
import okhttp3.Connection
import okhttp3.EventListener
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import java.io.IOException
import java.net.InetSocketAddress
import java.net.Proxy

internal class OkHttpEventTracer(
private val printer: VerbosePrinter.Printer,
) : EventListener() {
override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy) {
printer.printlnWithStackTrace("connecting to $inetSocketAddress...")
}
override fun connectEnd(
call: Call,
inetSocketAddress: InetSocketAddress,
proxy: Proxy,
protocol: Protocol?,
) {
printer.printlnWithStackTrace("connected to $inetSocketAddress")
}
override fun connectFailed(
call: Call,
inetSocketAddress: InetSocketAddress,
proxy: Proxy,
protocol: Protocol?,
ioe: IOException,
) {
printer.printlnWithStackTrace("connect to $inetSocketAddress failed")
}
override fun connectionAcquired(call: Call, connection: Connection) {
printer.printlnWithStackTrace("connection to ${connection.socket().remoteSocketAddress} acquired")
}
override fun requestHeadersStart(call: Call) {
printer.printlnWithStackTrace("writing request headers...")
}
override fun requestHeadersEnd(call: Call, request: Request) {
printer.printlnWithStackTrace("request headers written")
}
override fun requestBodyStart(call: Call) {
printer.printlnWithStackTrace("writing request body...")
}
override fun requestBodyEnd(call: Call, byteCount: Long) {
printer.printlnWithStackTrace("request body written: $byteCount bytes")
}
override fun requestFailed(call: Call, ioe: IOException) {
printer.printlnWithStackTrace("request failed: ${ioe.message}")
}
override fun responseHeadersStart(call: Call) {
printer.printlnWithStackTrace("reading response headers...")
}
override fun responseHeadersEnd(call: Call, response: Response) {
printer.printlnWithStackTrace("response headers read: status code = ${response.originalCode()}")
}
override fun responseBodyStart(call: Call) {
printer.printlnWithStackTrace("reading response body...")
}
override fun responseBodyEnd(call: Call, byteCount: Long) {
printer.printlnWithStackTrace("response body read: $byteCount bytes")
}
override fun responseFailed(call: Call, ioe: IOException) {
printer.printlnWithStackTrace("response failed: ${ioe.message}")
}
}
Loading

0 comments on commit 328110c

Please sign in to comment.