Skip to content

Commit

Permalink
get full-duplex bidi stream operations working - timeouts not working…
Browse files Browse the repository at this point in the history
… for any bidi 🤔
  • Loading branch information
jhump committed Jan 12, 2024
1 parent 798285b commit 3758032
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
2 changes: 1 addition & 1 deletion conformance/client/lite-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ features:
- STREAM_TYPE_CLIENT_STREAM
- STREAM_TYPE_SERVER_STREAM
- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,79 @@ class Client(
client: BidiStreamClient<Req, Resp>,
req: ClientCompatRequest,
): ClientResponseResult {
TODO("implement me")
val stream = client.execute(req.requestHeaders)
val cancel = req.cancel
val payloads : MutableList<MessageLite> = mutableListOf()
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (_: Exception) {
// Ignore. We should see it again below when we receive the response.
}

// In full-duplex mode, we read the response after writing request,
// to interleave the requests and responses.
if (i == 0 && cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.responses.close()
}
try {
val resp = stream.responses.messages.receive()
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.responses.close()
}
} catch (ex: ConnectException) {
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = ex,
trailers = ex.metadata,
numUnsentRequests = req.requestMessages.size - i,
)
}
}
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
}
else -> {
stream.requests.close() // close send
}
}

// Drain the response, in case there are any other messages.
var connEx : ConnectException? = null
var trailers : Headers
try {
for (resp in stream.responses.messages) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.responses.close()
}
}
trailers = stream.responses.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
} finally {
stream.responses.close()
}
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
)
}

private fun unaryResult(numUnsent: Int, result: ResponseMessage<out MessageLite>): ClientResponseResult {
Expand Down
2 changes: 1 addition & 1 deletion conformance/client/standard-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ features:
- STREAM_TYPE_CLIENT_STREAM
- STREAM_TYPE_SERVER_STREAM
- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM

0 comments on commit 3758032

Please sign in to comment.