Skip to content

Commit

Permalink
get server stream operations working - some strange errors with grpc …
Browse files Browse the repository at this point in the history
…server impl + gzip...
  • Loading branch information
jhump committed Jan 12, 2024
1 parent 9f82a3d commit 0df45eb
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 21 deletions.
2 changes: 1 addition & 1 deletion conformance/client/lite-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ features:
streamTypes:
# This config file only runs stream RPC test cases.
- STREAM_TYPE_CLIENT_STREAM
#- STREAM_TYPE_SERVER_STREAM
- STREAM_TYPE_SERVER_STREAM
#- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
#supportsTlsClientCerts: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.connectrpc.conformance.client

import com.connectrpc.Code
import com.connectrpc.ConnectException
import com.connectrpc.Headers
import com.connectrpc.ProtocolClientConfig
import com.connectrpc.RequestCompression
import com.connectrpc.ResponseMessage
Expand Down Expand Up @@ -197,7 +199,48 @@ class Client(
if (req.streamType != StreamType.SERVER_STREAM) {
throw RuntimeException("specified method ${req.method} is server-stream but stream type indicates ${req.streamType}")
}
TODO("implement me")
if (req.requestMessages.size != 1) {
throw RuntimeException("server-stream calls should indicate exactly one request message, got ${req.requestMessages.size}")
}
if (req.cancel != null &&
req.cancel !is Cancel.AfterCloseSendMs &&
req.cancel !is Cancel.AfterNumResponses
) {
throw RuntimeException("server stream calls can only support `AfterCloseSendMs` and 'AfterNumResponses' cancellation field, instead got ${req.cancel!!::class.simpleName}")
}
val msg = fromAny(req.requestMessages[0], client.reqTemplate, SERVER_STREAM_REQUEST_NAME)
val stream = client.execute(msg, req.requestHeaders)
val cancel = req.cancel
if (cancel is Cancel.AfterCloseSendMs) {
delay(cancel.millis.toLong())
stream.close()
}
val payloads : MutableList<MessageLite> = mutableListOf()
var connEx : ConnectException? = null
var trailers : Headers
try {
if (cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.close()
}
for (resp in stream.messages()) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && payloads.size == cancel.num) {
stream.close()
}
}
trailers = stream.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
} finally {
stream.close()
}
return ClientResponseResult(
headers = stream.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
)
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleBidi(
Expand Down Expand Up @@ -245,10 +288,6 @@ class Client(
if (result.code != result.cause.code) {
throw RuntimeException("RPC result has mismatching codes: ${result.code} != ${result.cause.code}")
}
if (args.verbosity > 2) {
System.err.println("* client: RPC failed with code ${result.code}")
result.cause.printStackTrace()
}
ClientResponseResult(
headers = result.headers,
error = result.cause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ class ConformanceClientLoop(
}
result = ClientCompatResponse.Result.ErrorResult(msg)
}
if (result is ClientCompatResponse.Result.ResponseResult && result.response.error != null) {
if (verbosity > 2) {
val ex = result.response.error!!
System.err.println("* client: RPC failed with code ${ex.code}")
ex.printStackTrace()
}
}
writeResponse(
output,
ClientCompatResponse(
Expand Down
2 changes: 1 addition & 1 deletion conformance/client/standard-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ features:
streamTypes:
# This config file only runs stream RPC test cases.
- STREAM_TYPE_CLIENT_STREAM
#- STREAM_TYPE_SERVER_STREAM
- STREAM_TYPE_SERVER_STREAM
#- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
#supportsTlsClientCerts: true
Expand Down
23 changes: 9 additions & 14 deletions okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal fun OkHttpClient.initializeStream(
}
val callRequest = builder.build()
val call = newCall(callRequest)
call.enqueue(ResponseCallback(call, onResult, isReceiveClosed))
call.enqueue(ResponseCallback(onResult))
return Stream(
onSend = { buffer ->
if (!isSendClosed.get()) {
Expand All @@ -82,9 +82,7 @@ internal fun OkHttpClient.initializeStream(
}

private class ResponseCallback(
private val call: Call,
private val onResult: suspend (StreamResult<Buffer>) -> Unit,
private val isClosed: AtomicBoolean,
) : Callback {
override fun onFailure(call: Call, e: IOException) {
runBlocking {
Expand All @@ -111,7 +109,7 @@ private class ResponseCallback(
resp.body!!.source().use { sourceBuffer ->
var exception: Exception? = null
try {
while (!sourceBuffer.safeExhausted() && !isClosed.get()) {
while (!sourceBuffer.exhausted()) {
val buffer = readStream(sourceBuffer)
val streamResult = StreamResult.Message(
message = buffer,
Expand All @@ -135,21 +133,18 @@ private class ResponseCallback(
}
}

private fun BufferedSource.safeExhausted(): Boolean {
return try {
exhausted()
} catch (e: StreamResetException) {
true
}
}

private fun Response.safeTrailers(): Map<String, List<String>>? {
return try {
if (body?.source()?.safeExhausted() == false) {
try {
if (body?.source()?.exhausted() == false) {
// Assuming this means that trailers are not available.
// Returning null to signal trailers are "missing".
return null
}
} catch (e: Exception) {
return null
}

return try {
trailers().toLowerCaseKeysMultiMap()
} catch (_: Throwable) {
// Something went terribly wrong.
Expand Down

0 comments on commit 0df45eb

Please sign in to comment.